PATH:
usr
/
lib
/
python2.7
/
site-packages
/
azurelinuxagent
/
common
/
protocol
# Microsoft Azure Linux Agent # # Copyright 2018 Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Requires Python 2.6+ and Openssl 1.0+ import json import os import random import time import uuid import xml.sax.saxutils as saxutils from collections import defaultdict from datetime import datetime, timedelta import azurelinuxagent.common.conf as conf import azurelinuxagent.common.logger as logger import azurelinuxagent.common.utils.textutil as textutil from azurelinuxagent.common.agent_supported_feature import get_agent_supported_features_list_for_crp from azurelinuxagent.common.datacontract import validate_param from azurelinuxagent.common.event import add_event, WALAEventOperation, report_event, \ CollectOrReportEventDebugInfo, add_periodic from azurelinuxagent.common.exception import ProtocolNotFoundError, \ ResourceGoneError, ExtensionDownloadError, InvalidContainerError, ProtocolError, HttpError, VmSettingsError from azurelinuxagent.common.future import httpclient, bytebuffer, ustr from azurelinuxagent.common.protocol.extensions_goal_state import ExtensionsGoalState, GoalStateMismatchError from azurelinuxagent.common.protocol.extensions_goal_state_factory import ExtensionsGoalStateFactory from azurelinuxagent.common.protocol.goal_state import GoalState, TRANSPORT_CERT_FILE_NAME, TRANSPORT_PRV_FILE_NAME from azurelinuxagent.common.protocol.hostplugin import HostPluginProtocol from azurelinuxagent.common.protocol.restapi import DataContract, ExtHandlerPackage, \ ExtHandlerPackageList, ProvisionStatus, VMInfo, VMStatus from azurelinuxagent.common.telemetryevent import GuestAgentExtensionEventsSchema from azurelinuxagent.common.utils import fileutil, restutil from azurelinuxagent.common.utils.archive import StateFlusher from azurelinuxagent.common.utils.cryptutil import CryptUtil from azurelinuxagent.common.utils.flexible_version import FlexibleVersion from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, \ findtext, gettext, remove_bom, get_bytes_from_pem, parse_json, format_exception from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION VERSION_INFO_URI = "http://{0}/?comp=versions" HEALTH_REPORT_URI = "http://{0}/machine?comp=health" ROLE_PROP_URI = "http://{0}/machine?comp=roleProperties" TELEMETRY_URI = "http://{0}/machine?comp=telemetrydata" WIRE_SERVER_ADDR_FILE_NAME = "WireServer" INCARNATION_FILE_NAME = "Incarnation" GOAL_STATE_FILE_NAME = "GoalState.{0}.xml" VM_SETTINGS_FILE_NAME = "VmSettings.{0}.json" HOSTING_ENV_FILE_NAME = "HostingEnvironmentConfig.xml" SHARED_CONF_FILE_NAME = "SharedConfig.xml" REMOTE_ACCESS_FILE_NAME = "RemoteAccess.{0}.xml" EXT_CONF_FILE_NAME = "ExtensionsConfig.{0}.xml" MANIFEST_FILE_NAME = "{0}.{1}.manifest.xml" PROTOCOL_VERSION = "2012-11-30" ENDPOINT_FINE_NAME = "WireServer" SHORT_WAITING_INTERVAL = 1 # 1 second MAX_EVENT_BUFFER_SIZE = 2 ** 16 - 2 ** 10 class UploadError(HttpError): pass class WireProtocol(DataContract): def __init__(self, endpoint): if endpoint is None: raise ProtocolError("WireProtocol endpoint is None") self.client = WireClient(endpoint) def detect(self): self.client.check_wire_protocol_version() trans_prv_file = os.path.join(conf.get_lib_dir(), TRANSPORT_PRV_FILE_NAME) trans_cert_file = os.path.join(conf.get_lib_dir(), TRANSPORT_CERT_FILE_NAME) cryptutil = CryptUtil(conf.get_openssl_cmd()) cryptutil.gen_transport_cert(trans_prv_file, trans_cert_file) # Initialize the goal state, including all the inner properties logger.info('Initializing goal state during protocol detection') self.client.update_goal_state(force_update=True) def update_goal_state(self): self.client.update_goal_state() def update_host_plugin_from_goal_state(self): self.client.update_host_plugin_from_goal_state() def get_endpoint(self): return self.client.get_endpoint() def get_vminfo(self): goal_state = self.client.get_goal_state() hosting_env = self.client.get_hosting_env() vminfo = VMInfo() vminfo.subscriptionId = None vminfo.vmName = hosting_env.vm_name vminfo.tenantName = hosting_env.deployment_name vminfo.roleName = hosting_env.role_name vminfo.roleInstanceName = goal_state.role_instance_id return vminfo def get_certs(self): certificates = self.client.get_certs() return certificates.cert_list def get_incarnation(self): return self.client.get_goal_state().incarnation def get_vmagent_manifests(self): goal_state = self.client.get_goal_state() ext_conf = self.client.get_extensions_goal_state() return ext_conf.agent_manifests, goal_state.incarnation def get_vmagent_pkgs(self, vmagent_manifest): goal_state = self.client.get_goal_state() ga_manifest = self.client.fetch_gafamily_manifest(vmagent_manifest, goal_state) valid_pkg_list = ga_manifest.pkg_list return valid_pkg_list def get_ext_handler_pkgs(self, ext_handler): logger.verbose("Get extension handler package") man = self.client.get_ext_manifest(ext_handler) return man.pkg_list def get_extensions_goal_state(self): return self.client.get_extensions_goal_state() def _download_ext_handler_pkg_through_host(self, uri, destination): host = self.client.get_host_plugin() uri, headers = host.get_artifact_request(uri, host.manifest_uri) success = self.client.stream(uri, destination, headers=headers, use_proxy=False, max_retry=1) # set max_retry to 1 because extension packages already have a retry loop (see ExtHandlerInstance.download()) return success def download_ext_handler_pkg(self, uri, destination, headers=None, use_proxy=True): # pylint: disable=W0613 direct_func = lambda: self.client.stream(uri, destination, headers=None, use_proxy=True, max_retry=1) # NOTE: the host_func may be called after refreshing the goal state, be careful about any goal state data # in the lambda. host_func = lambda: self._download_ext_handler_pkg_through_host(uri, destination) try: success = self.client.send_request_using_appropriate_channel(direct_func, host_func) is not None except Exception: success = False return success def report_provision_status(self, provision_status): validate_param("provision_status", provision_status, ProvisionStatus) if provision_status.status is not None: self.client.report_health(provision_status.status, provision_status.subStatus, provision_status.description) if provision_status.properties.certificateThumbprint is not None: thumbprint = provision_status.properties.certificateThumbprint self.client.report_role_prop(thumbprint) def report_vm_status(self, vm_status): validate_param("vm_status", vm_status, VMStatus) self.client.status_blob.set_vm_status(vm_status) self.client.upload_status_blob() def report_event(self, events_iterator): self.client.report_event(events_iterator) def upload_logs(self, logs): self.client.upload_logs(logs) def get_status_blob_data(self): return self.client.status_blob.data def _build_role_properties(container_id, role_instance_id, thumbprint): xml = (u"<?xml version=\"1.0\" encoding=\"utf-8\"?>" u"<RoleProperties>" u"<Container>" u"<ContainerId>{0}</ContainerId>" u"<RoleInstances>" u"<RoleInstance>" u"<Id>{1}</Id>" u"<Properties>" u"<Property name=\"CertificateThumbprint\" value=\"{2}\" />" u"</Properties>" u"</RoleInstance>" u"</RoleInstances>" u"</Container>" u"</RoleProperties>" u"").format(container_id, role_instance_id, thumbprint) return xml def _build_health_report(incarnation, container_id, role_instance_id, status, substatus, description): # The max description that can be sent to WireServer is 4096 bytes. # Exceeding this max can result in a failure to report health. # To keep this simple, we will keep a 10% buffer and trim before # encoding the description. if description: max_chars_before_encoding = 3686 len_before_trim = len(description) description = description[:max_chars_before_encoding] trimmed_char_count = len_before_trim - len(description) if trimmed_char_count > 0: logger.info( 'Trimmed health report description by {0} characters'.format( trimmed_char_count ) ) # Escape '&', '<' and '>' description = saxutils.escape(ustr(description)) detail = u'' if substatus is not None: substatus = saxutils.escape(ustr(substatus)) detail = (u"<Details>" u"<SubStatus>{0}</SubStatus>" u"<Description>{1}</Description>" u"</Details>").format(substatus, description) xml = (u"<?xml version=\"1.0\" encoding=\"utf-8\"?>" u"<Health " u"xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"" u" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" u"<GoalStateIncarnation>{0}</GoalStateIncarnation>" u"<Container>" u"<ContainerId>{1}</ContainerId>" u"<RoleInstanceList>" u"<Role>" u"<InstanceId>{2}</InstanceId>" u"<Health>" u"<State>{3}</State>" u"{4}" u"</Health>" u"</Role>" u"</RoleInstanceList>" u"</Container>" u"</Health>" u"").format(incarnation, container_id, role_instance_id, status, detail) return xml def ga_status_to_guest_info(ga_status): """ Convert VMStatus object to status blob format """ v1_ga_guest_info = { "computerName": ga_status.hostname, "osName": ga_status.osname, "osVersion": ga_status.osversion, "version": ga_status.version, } return v1_ga_guest_info def __get_formatted_msg_for_status_reporting(msg, lang="en-US"): return { 'lang': lang, 'message': msg } def _get_utc_timestamp_for_status_reporting(time_format="%Y-%m-%dT%H:%M:%SZ", timestamp=None): timestamp = time.gmtime() if timestamp is None else timestamp return time.strftime(time_format, timestamp) def ga_status_to_v1(ga_status): v1_ga_status = { "version": ga_status.version, "status": ga_status.status, "formattedMessage": __get_formatted_msg_for_status_reporting(ga_status.message) } if ga_status.update_status is not None: v1_ga_status["updateStatus"] = get_ga_update_status_to_v1(ga_status.update_status) return v1_ga_status def get_ga_update_status_to_v1(update_status): v1_ga_update_status = { "expectedVersion": update_status.expected_version, "status": update_status.status, "code": update_status.code, "formattedMessage": __get_formatted_msg_for_status_reporting(update_status.message) } return v1_ga_update_status def ext_substatus_to_v1(sub_status_list): status_list = [] for substatus in sub_status_list: status = { "name": substatus.name, "status": substatus.status, "code": substatus.code, "formattedMessage": __get_formatted_msg_for_status_reporting(substatus.message) } status_list.append(status) return status_list def ext_status_to_v1(ext_status): if ext_status is None: return None timestamp = _get_utc_timestamp_for_status_reporting() v1_sub_status = ext_substatus_to_v1(ext_status.substatusList) v1_ext_status = { "status": { "name": ext_status.name, "configurationAppliedTime": ext_status.configurationAppliedTime, "operation": ext_status.operation, "status": ext_status.status, "code": ext_status.code, "formattedMessage": __get_formatted_msg_for_status_reporting(ext_status.message) }, "version": 1.0, "timestampUTC": timestamp } if len(v1_sub_status) != 0: v1_ext_status['status']['substatus'] = v1_sub_status return v1_ext_status def ext_handler_status_to_v1(ext_handler_status): v1_handler_status = { 'handlerVersion': ext_handler_status.version, 'handlerName': ext_handler_status.name, 'status': ext_handler_status.status, 'code': ext_handler_status.code, 'useExactVersion': True } if ext_handler_status.message is not None: v1_handler_status["formattedMessage"] = __get_formatted_msg_for_status_reporting(ext_handler_status.message) v1_ext_status = ext_status_to_v1(ext_handler_status.extension_status) if ext_handler_status.extension_status is not None and v1_ext_status is not None: v1_handler_status["runtimeSettingsStatus"] = { 'settingsStatus': v1_ext_status, 'sequenceNumber': ext_handler_status.extension_status.sequenceNumber } # Add extension name if Handler supports MultiConfig if ext_handler_status.supports_multi_config: v1_handler_status["runtimeSettingsStatus"]["extensionName"] = ext_handler_status.extension_status.name return v1_handler_status def vm_artifacts_aggregate_status_to_v1(vm_artifacts_aggregate_status): gs_aggregate_status = vm_artifacts_aggregate_status.goal_state_aggregate_status if gs_aggregate_status is None: return None v1_goal_state_aggregate_status = { "formattedMessage": __get_formatted_msg_for_status_reporting(gs_aggregate_status.message), "timestampUTC": _get_utc_timestamp_for_status_reporting(timestamp=gs_aggregate_status.processed_time), "inSvdSeqNo": gs_aggregate_status.in_svd_seq_no, "status": gs_aggregate_status.status, "code": gs_aggregate_status.code } v1_artifact_aggregate_status = { "goalStateAggregateStatus": v1_goal_state_aggregate_status } return v1_artifact_aggregate_status def vm_status_to_v1(vm_status): timestamp = _get_utc_timestamp_for_status_reporting() v1_ga_guest_info = ga_status_to_guest_info(vm_status.vmAgent) v1_ga_status = ga_status_to_v1(vm_status.vmAgent) v1_vm_artifact_aggregate_status = vm_artifacts_aggregate_status_to_v1( vm_status.vmAgent.vm_artifacts_aggregate_status) v1_handler_status_list = [] for handler_status in vm_status.vmAgent.extensionHandlers: v1_handler_status_list.append(ext_handler_status_to_v1(handler_status)) v1_agg_status = { 'guestAgentStatus': v1_ga_status, 'handlerAggregateStatus': v1_handler_status_list } if v1_vm_artifact_aggregate_status is not None: v1_agg_status['vmArtifactsAggregateStatus'] = v1_vm_artifact_aggregate_status v1_vm_status = { 'version': '1.1', 'timestampUTC': timestamp, 'aggregateStatus': v1_agg_status, 'guestOSInfo': v1_ga_guest_info } supported_features = [] for _, feature in get_agent_supported_features_list_for_crp().items(): supported_features.append( { "Key": feature.name, "Value": feature.version } ) if supported_features: v1_vm_status["supportedFeatures"] = supported_features return v1_vm_status class StatusBlob(object): def __init__(self, client): self.vm_status = None self.client = client self.type = None self.data = None def set_vm_status(self, vm_status): validate_param("vmAgent", vm_status, VMStatus) self.vm_status = vm_status def to_json(self): report = vm_status_to_v1(self.vm_status) return json.dumps(report) __storage_version__ = "2014-02-14" def prepare(self, blob_type): logger.verbose("Prepare status blob") self.data = self.to_json() self.type = blob_type def upload(self, url): try: if not self.type in ["BlockBlob", "PageBlob"]: raise ProtocolError("Illegal blob type: {0}".format(self.type)) if self.type == "BlockBlob": self.put_block_blob(url, self.data) else: self.put_page_blob(url, self.data) return True except Exception as e: logger.verbose("Initial status upload failed: {0}", e) return False def get_block_blob_headers(self, blob_size): return { "Content-Length": ustr(blob_size), "x-ms-blob-type": "BlockBlob", "x-ms-date": _get_utc_timestamp_for_status_reporting(), "x-ms-version": self.__class__.__storage_version__ } def put_block_blob(self, url, data): logger.verbose("Put block blob") headers = self.get_block_blob_headers(len(data)) resp = self.client.call_storage_service(restutil.http_put, url, data, headers) if resp.status != httpclient.CREATED: raise UploadError( "Failed to upload block blob: {0}".format(resp.status)) def get_page_blob_create_headers(self, blob_size): return { "Content-Length": "0", "x-ms-blob-content-length": ustr(blob_size), "x-ms-blob-type": "PageBlob", "x-ms-date": _get_utc_timestamp_for_status_reporting(), "x-ms-version": self.__class__.__storage_version__ } def get_page_blob_page_headers(self, start, end): return { "Content-Length": ustr(end - start), "x-ms-date": _get_utc_timestamp_for_status_reporting(), "x-ms-range": "bytes={0}-{1}".format(start, end - 1), "x-ms-page-write": "update", "x-ms-version": self.__class__.__storage_version__ } def put_page_blob(self, url, data): logger.verbose("Put page blob") # Convert string into bytes and align to 512 bytes data = bytearray(data, encoding='utf-8') page_blob_size = int((len(data) + 511) / 512) * 512 headers = self.get_page_blob_create_headers(page_blob_size) resp = self.client.call_storage_service(restutil.http_put, url, "", headers) if resp.status != httpclient.CREATED: raise UploadError( "Failed to clean up page blob: {0}".format(resp.status)) if url.count("?") <= 0: url = "{0}?comp=page".format(url) else: url = "{0}&comp=page".format(url) logger.verbose("Upload page blob") page_max = 4 * 1024 * 1024 # Max page size: 4MB start = 0 end = 0 while end < len(data): end = min(len(data), start + page_max) content_size = end - start # Align to 512 bytes page_end = int((end + 511) / 512) * 512 buf_size = page_end - start buf = bytearray(buf_size) buf[0: content_size] = data[start: end] headers = self.get_page_blob_page_headers(start, page_end) resp = self.client.call_storage_service( restutil.http_put, url, bytebuffer(buf), headers) if resp is None or resp.status != httpclient.CREATED: raise UploadError( "Failed to upload page blob: {0}".format(resp.status)) start = end def event_param_to_v1(param): param_format = ustr('<Param Name="{0}" Value={1} T="{2}" />') param_type = type(param.value) attr_type = "" if param_type is int: attr_type = 'mt:uint64' elif param_type is str: attr_type = 'mt:wstr' elif ustr(param_type).count("'unicode'") > 0: attr_type = 'mt:wstr' elif param_type is bool: attr_type = 'mt:bool' elif param_type is float: attr_type = 'mt:float64' return param_format.format(param.name, saxutils.quoteattr(ustr(param.value)), attr_type) def event_to_v1_encoded(event, encoding='utf-8'): params = "" for param in event.parameters: params += event_param_to_v1(param) event_str = ustr('<Event id="{0}"><![CDATA[{1}]]></Event>').format(event.eventId, params) return event_str.encode(encoding) class WireClient(object): def __init__(self, endpoint): logger.info("Wire server endpoint:{0}", endpoint) self._endpoint = endpoint self._goal_state = None self._extensions_goal_state = None # The goal state to use for extensions; can be an ExtensionsGoalStateFromVmSettings or ExtensionsGoalStateFromExtensionsConfig self._cached_vm_settings = None # Cached value of the most recent ExtensionsGoalStateFromVmSettings self._host_plugin = None self._host_plugin_version = FlexibleVersion("0.0.0.0") # Version 0 means "unknown" self._host_plugin_supports_vm_settings = False self._host_plugin_supports_vm_settings_next_check = datetime.now() self.status_blob = StatusBlob(self) self.goal_state_flusher = StateFlusher(conf.get_lib_dir()) self._vm_settings_error_reporter = _VmSettingsErrorReporter() def get_endpoint(self): return self._endpoint def call_wireserver(self, http_req, *args, **kwargs): try: # Never use the HTTP proxy for wireserver kwargs['use_proxy'] = False resp = http_req(*args, **kwargs) if restutil.request_failed(resp): msg = "[Wireserver Failed] URI {0} ".format(args[0]) if resp is not None: msg += " [HTTP Failed] Status Code {0}".format(resp.status) raise ProtocolError(msg) # If the GoalState is stale, pass along the exception to the caller except ResourceGoneError: raise except Exception as e: raise ProtocolError("[Wireserver Exception] {0}".format( ustr(e))) return resp def decode_config(self, data): if data is None: return None data = remove_bom(data) xml_text = ustr(data, encoding='utf-8') return xml_text def fetch_config(self, uri, headers): resp = self.call_wireserver(restutil.http_get, uri, headers=headers) return self.decode_config(resp.read()) def fetch_cache(self, local_file): if not os.path.isfile(local_file): raise ProtocolError("{0} is missing.".format(local_file)) try: return fileutil.read_file(local_file) except IOError as e: raise ProtocolError("Failed to read cache: {0}".format(e)) @staticmethod def _save_cache(data, file_name): try: file_path = os.path.join(conf.get_lib_dir(), file_name) fileutil.write_file(file_path, data) except IOError as e: fileutil.clean_ioerror(e, paths=[file_name]) raise ProtocolError("Failed to write cache: {0}".format(e)) @staticmethod def call_storage_service(http_req, *args, **kwargs): # Default to use the configured HTTP proxy if not 'use_proxy' in kwargs or kwargs['use_proxy'] is None: kwargs['use_proxy'] = True return http_req(*args, **kwargs) def fetch_manifest_through_host(self, uri): host = self.get_host_plugin() uri, headers = host.get_artifact_request(uri) response, _ = self.fetch(uri, headers, use_proxy=False, retry_codes=restutil.HGAP_GET_EXTENSION_ARTIFACT_RETRY_CODES) return response def fetch_manifest(self, version_uris, timeout_in_minutes=5, timeout_in_ms=0): logger.verbose("Fetch manifest") version_uris_shuffled = version_uris random.shuffle(version_uris_shuffled) uris_tried = 0 start_time = datetime.now() for version_uri in version_uris_shuffled: if datetime.now() - start_time > timedelta(minutes=timeout_in_minutes, milliseconds=timeout_in_ms): logger.warn("Agent timed-out after {0} minutes while fetching extension manifests. {1}/{2} uris tried.", timeout_in_minutes, uris_tried, len(version_uris)) break # GA expects a location and failoverLocation in ExtensionsConfig, but # this is not always the case. See #1147. if version_uri is None: logger.verbose('The specified manifest URL is empty, ignored.') continue # Disable W0640: OK to use version_uri in a lambda within the loop's body direct_func = lambda: self.fetch(version_uri)[0] # pylint: disable=W0640 # NOTE: the host_func may be called after refreshing the goal state, be careful about any goal state data # in the lambda. # Disable W0640: OK to use version_uri in a lambda within the loop's body host_func = lambda: self.fetch_manifest_through_host(version_uri) # pylint: disable=W0640 try: manifest = self.send_request_using_appropriate_channel(direct_func, host_func) if manifest is not None: host = self.get_host_plugin() host.update_manifest_uri(version_uri) return manifest except Exception as error: logger.warn("Failed to fetch manifest from {0}. Error: {1}", version_uri, ustr(error)) uris_tried += 1 raise ExtensionDownloadError("Failed to fetch manifest from all sources") def stream(self, uri, destination, headers=None, use_proxy=None, max_retry=None): """ max_retry indicates the maximum number of retries for the HTTP request; None indicates that the default value should be used """ success = False logger.verbose("Fetch [{0}] with headers [{1}] to file [{2}]", uri, headers, destination) response = self._fetch_response(uri, headers, use_proxy, max_retry=max_retry) if response is not None and not restutil.request_failed(response): chunk_size = 1024 * 1024 # 1MB buffer try: with open(destination, 'wb', chunk_size) as destination_fh: complete = False while not complete: chunk = response.read(chunk_size) destination_fh.write(chunk) complete = len(chunk) < chunk_size success = True except Exception as error: logger.error('Error streaming {0} to {1}: {2}'.format(uri, destination, ustr(error))) return success def fetch(self, uri, headers=None, use_proxy=None, decode=True, max_retry=None, retry_codes=None, ok_codes=None): """ max_retry indicates the maximum number of retries for the HTTP request; None indicates that the default value should be used Returns a tuple with the content and headers of the response. The headers are a list of (name, value) tuples. """ logger.verbose("Fetch [{0}] with headers [{1}]", uri, headers) content = None response_headers = None response = self._fetch_response(uri, headers, use_proxy, max_retry=max_retry, retry_codes=retry_codes, ok_codes=ok_codes) if response is not None and not restutil.request_failed(response, ok_codes=ok_codes): response_content = response.read() content = self.decode_config(response_content) if decode else response_content response_headers = response.getheaders() return content, response_headers def _fetch_response(self, uri, headers=None, use_proxy=None, max_retry=None, retry_codes=None, ok_codes=None): """ max_retry indicates the maximum number of retries for the HTTP request; None indicates that the default value should be used """ resp = None try: resp = self.call_storage_service( restutil.http_get, uri, headers=headers, use_proxy=use_proxy, max_retry=max_retry, retry_codes=retry_codes) host_plugin = self.get_host_plugin() if restutil.request_failed(resp, ok_codes=ok_codes): error_response = restutil.read_response_error(resp) msg = "Fetch failed from [{0}]: {1}".format(uri, error_response) logger.warn(msg) if host_plugin is not None: host_plugin.report_fetch_health(uri, is_healthy=not restutil.request_failed_at_hostplugin(resp), source='WireClient', response=error_response) raise ProtocolError(msg) else: if host_plugin is not None: host_plugin.report_fetch_health(uri, source='WireClient') except (HttpError, ProtocolError, IOError) as error: msg = "Fetch failed: {0}".format(error) logger.warn(msg) report_event(op=WALAEventOperation.HttpGet, is_success=False, message=msg, log_event=False) if isinstance(error, (InvalidContainerError, ResourceGoneError)): # These are retryable errors that should force a goal state refresh in the host plugin raise return resp def update_host_plugin_from_goal_state(self): """ Fetches a new goal state and updates the Container ID and Role Config Name of the host plugin client """ goal_state = GoalState(self) self._update_host_plugin(goal_state.container_id, goal_state.role_config_name) def update_goal_state(self, force_update=False, is_retry=False): """ Updates the goal state if the incarnation or etag changed or if 'force_update' is True """ try: # # The entire goal state needs to be retrieved from the WireServer (via the GoalState class), and the HostGAPlugin # (via the self._fetch_vm_settings_goal_state method). # # We fetch it in 3 parts: # # 1) The "main" goal state from the WireServer, which includes the incarnation, container ID, role config, and URLs # to the rest of the goal state (certificates, remote users, extensions config, etc). We do this first because # we need to initialize the HostGAPlugin with the container ID and role config. # goal_state = GoalState(self) self._update_host_plugin(goal_state.container_id, goal_state.role_config_name) # # 2) Then we fetch the vmSettings from the HostGAPlugin. We do this before fetching the rest of the goal state from the # WireServer to minimize the time between the initial call to the WireServer and the call to the HostGAPlugin (and hence # reduce the window in which a new goal state may arrive in-between the 2 calls) # vm_settings_goal_state, vm_settings_goal_state_updated = (None, False) if conf.get_enable_fast_track(): try: vm_settings_goal_state, vm_settings_goal_state_updated = self._fetch_vm_settings_goal_state(force_update=force_update) except Exception as error: # _fetch_vm_settings_goal_state() does its own detailed error reporting and raises ProtocolError; do not report those if not isinstance(error, ProtocolError): self._vm_settings_error_reporter.report_error(format_exception(error)) self._vm_settings_error_reporter.report_summary() # # 3) Lastly we, fetch the rest of the goal state from the WireServer (but ony if needed: initialization, a "forced" update, or # a change in the incarnation). Note that if we fetch the full goal state we also update self._goal_state. # if force_update: logger.info("Forcing an update of the goal state..") fetch_full_goal_state = force_update or self._goal_state is None or self._goal_state.incarnation != goal_state.incarnation if not fetch_full_goal_state: goal_state_updated = False else: goal_state.fetch_full_goal_state(self) self._goal_state = goal_state goal_state_updated = True # # If we fetched the vmSettings then compare them against extensionsConfig and use them for the extensions goal state if # everything matches, otherwise use extensionsConfig. # use_vm_settings = False if vm_settings_goal_state is not None: if not goal_state_updated and not vm_settings_goal_state_updated: # no need to compare them, just use vmSettings use_vm_settings = True else: try: ExtensionsGoalState.compare(self._goal_state.extensions_config, vm_settings_goal_state) use_vm_settings = True except GoalStateMismatchError as mismatch: if not is_retry and mismatch.attribute in ("created_on_timestamp", "activity_id"): # this may be OK; a new goal state may have arrived in-between the calls to the HostGAPlugin and the WireServer; # retry one time after a delay and then report the error if it happens again. time.sleep(conf.get_goal_state_period()) self.update_goal_state(is_retry=True) return self._vm_settings_error_reporter.report_error(ustr(mismatch)) self._vm_settings_error_reporter.report_summary() if use_vm_settings: self._extensions_goal_state = vm_settings_goal_state else: self._extensions_goal_state = self._goal_state.extensions_config # # If either goal state changed (goal_state or vm_settings_goal_state) save them # if goal_state_updated or vm_settings_goal_state_updated: self._save_goal_state() except ProtocolError: raise except Exception as exception: raise ProtocolError("Error fetching goal state: {0}".format(ustr(exception))) def _fetch_vm_settings_goal_state(self, force_update): """ Queries the vmSettings from the HostGAPlugin and returns an (ExtensionsGoalStateFromVmSettings, bool) tuple with the vmSettings and a boolean indicating if they are an updated (True) or a cached value (False). Raises ProtocolError if the request fails for any reason (e.g. not supported, time out, server error) """ def raise_not_supported(reset_state=False): if reset_state: self._host_plugin_supports_vm_settings = False self._host_plugin_supports_vm_settings_next_check = datetime.now() + timedelta(hours=6) # check again in 6 hours # "Not supported" is not considered an error, so don't use self._vm_settings_error_reporter to report it logger.info("vmSettings is not supported") add_event(op=WALAEventOperation.HostPlugin, message="vmSettings is not supported", is_success=True) raise ProtocolError("VmSettings not supported") # Raise if VmSettings are not supported but check for periodically since the HostGAPlugin could have been updated since the last check if not self._host_plugin_supports_vm_settings and self._host_plugin_supports_vm_settings_next_check > datetime.now(): raise_not_supported() etag = None if force_update or self._cached_vm_settings is None else self._cached_vm_settings.etag correlation_id = str(uuid.uuid4()) def format_message(msg): return "GET vmSettings [correlation ID: {0} eTag: {1} HGAP: {2}]: {3}".format(correlation_id, etag, self._host_plugin_version, msg) try: def get_vm_settings(): url, headers = self.get_host_plugin().get_vm_settings_request(correlation_id) if etag is not None: headers['if-none-match'] = etag return restutil.http_get(url, headers=headers, use_proxy=False, max_retry=1, return_raw_response=True) self._vm_settings_error_reporter.report_request() response = get_vm_settings() if response.status == httpclient.GONE: # retry after refreshing the HostGAPlugin self.update_host_plugin_from_goal_state() response = get_vm_settings() if response.status == httpclient.NOT_FOUND: # the HostGAPlugin does not support FastTrack raise_not_supported(reset_state=True) if response.status == httpclient.NOT_MODIFIED: # The goal state hasn't changed, return the current instance return self._cached_vm_settings, False if response.status != httpclient.OK: error_description = restutil.read_response_error(response) # For historical reasons the HostGAPlugin returns 502 (BAD_GATEWAY) for internal errors instead of using # 500 (INTERNAL_SERVER_ERROR). We add a short prefix to the error message in the hope that it will help # clear any confusion produced by the poor choice of status code. if response.status == httpclient.BAD_GATEWAY: error_description = "[Internal error in HostGAPlugin] {0}".format(error_description) error_description = format_message(error_description) if 400 <= response.status <= 499: self._vm_settings_error_reporter.report_error(error_description, _VmSettingsError.ClientError) elif 500 <= response.status <= 599: self._vm_settings_error_reporter.report_error(error_description, _VmSettingsError.ServerError) else: self._vm_settings_error_reporter.report_error(error_description) raise ProtocolError(error_description) for h in response.getheaders(): if h[0].lower() == 'etag': response_etag = h[1] break else: # since the vmSettings were updated, the response must include an etag message = format_message("The vmSettings response does not include an Etag header") self._vm_settings_error_reporter.report_error(message) raise ProtocolError(message) response_content = self.decode_config(response.read()) vm_settings = ExtensionsGoalStateFactory.create_from_vm_settings(response_etag, response_content) # log the HostGAPlugin version if vm_settings.host_ga_plugin_version != self._host_plugin_version: self._host_plugin_version = vm_settings.host_ga_plugin_version message = "HostGAPlugin version: {0}".format(vm_settings.host_ga_plugin_version) logger.info(message) add_event(op=WALAEventOperation.HostPlugin, message=message, is_success=True) # Don't support HostGAPlugin versions older than 115 if vm_settings.host_ga_plugin_version < FlexibleVersion("1.0.8.115"): raise_not_supported(reset_state=True) logger.info("Fetched new vmSettings [correlation ID: {0} New eTag: {1}]", correlation_id, vm_settings.etag) self._host_plugin_supports_vm_settings = True self._cached_vm_settings = vm_settings return vm_settings, True except ProtocolError: raise except Exception as exception: if isinstance(exception, VmSettingsError): message = format_message(ustr(exception)) self._vm_settings_error_reporter.report_error(message) try: # pylint - Instance of 'Exception' has no 'vm_settings_text/etag' member (no-member) # Disabled; the above check ensures the exception is a VmSettingsError self._save_cache(exception.vm_settings_text, VM_SETTINGS_FILE_NAME.format(exception.etag)) # pylint: disable=no-member except Exception as e: # TODO: Once Fast Track is stable, make this a warning logger.info("Failed to save vmSettings: {0}", ustr(e)) if isinstance(exception, IOError) and "timed out" in ustr(exception): message = format_message("Timeout") self._vm_settings_error_reporter.report_error(message, _VmSettingsError.Timeout) else: message = format_message("Request failed: {0}".format(textutil.format_exception(exception))) self._vm_settings_error_reporter.report_error(message, _VmSettingsError.RequestFailed) raise ProtocolError(message) def _update_host_plugin(self, container_id, role_config_name): if self._host_plugin is not None: self._host_plugin.update_container_id(container_id) self._host_plugin.update_role_config_name(role_config_name) def _save_goal_state(self): try: self.goal_state_flusher.flush() except Exception as e: logger.warn("Failed to save the previous goal state to the history folder: {0}", ustr(e)) try: def save_if_not_none(goal_state_property, file_name): if goal_state_property is not None and goal_state_property.xml_text is not None: self._save_cache(goal_state_property.xml_text, file_name) # NOTE: Certificates are saved in Certificate.__init__ self._save_cache(self._goal_state.incarnation, INCARNATION_FILE_NAME) save_if_not_none(self._goal_state, GOAL_STATE_FILE_NAME.format(self._goal_state.incarnation)) save_if_not_none(self._goal_state.hosting_env, HOSTING_ENV_FILE_NAME) save_if_not_none(self._goal_state.shared_conf, SHARED_CONF_FILE_NAME) save_if_not_none(self._goal_state.remote_access, REMOTE_ACCESS_FILE_NAME.format(self._goal_state.incarnation)) if self._goal_state.extensions_config is not None: text = self._goal_state.extensions_config.get_redacted_text() if text != '': self._save_cache(text, EXT_CONF_FILE_NAME.format(self._goal_state.extensions_config.incarnation)) # TODO: When Fast Track is fully enabled self._cached_vm_settings will go away and this can be deleted if self._cached_vm_settings is not None: text = self._cached_vm_settings.get_redacted_text() if text != '': self._save_cache(text, VM_SETTINGS_FILE_NAME.format(self._cached_vm_settings.id)) # END TODO except Exception as e: logger.warn("Failed to save the goal state to disk: {0}", ustr(e)) def _set_host_plugin(self, new_host_plugin): if new_host_plugin is None: logger.warn("Setting empty Host Plugin object!") self._host_plugin = new_host_plugin def get_goal_state(self): if self._goal_state is None: raise ProtocolError("Trying to fetch goal state before initialization!") return self._goal_state def get_hosting_env(self): if self._goal_state is None: raise ProtocolError("Trying to fetch Hosting Environment before initialization!") return self._goal_state.hosting_env def get_shared_conf(self): if self._goal_state is None: raise ProtocolError("Trying to fetch Shared Conf before initialization!") return self._goal_state.shared_conf def get_certs(self): if self._goal_state is None: raise ProtocolError("Trying to fetch Certificates before initialization!") return self._goal_state.certs def get_extensions_goal_state(self): if self._extensions_goal_state is None: raise ProtocolError("Trying to fetch ExtensionsGoalState before initialization!") return self._extensions_goal_state def get_ext_manifest(self, ext_handler): if self._goal_state is None: raise ProtocolError("Trying to fetch Extension Manifest before initialization!") try: xml_text = self.fetch_manifest(ext_handler.manifest_uris) self._save_cache(xml_text, MANIFEST_FILE_NAME.format(ext_handler.name, self.get_goal_state().incarnation)) return ExtensionManifest(xml_text) except Exception as e: raise ExtensionDownloadError("Failed to retrieve extension manifest. Error: {0}".format(ustr(e))) def get_remote_access(self): if self._goal_state is None: raise ProtocolError("Trying to fetch Remote Access before initialization!") return self._goal_state.remote_access def fetch_gafamily_manifest(self, vmagent_manifest, goal_state): local_file = MANIFEST_FILE_NAME.format(vmagent_manifest.family, goal_state.incarnation) local_file = os.path.join(conf.get_lib_dir(), local_file) try: xml_text = self.fetch_manifest(vmagent_manifest.uris) fileutil.write_file(local_file, xml_text) return ExtensionManifest(xml_text) except Exception as e: raise ProtocolError("Failed to retrieve GAFamily manifest. Error: {0}".format(ustr(e))) def check_wire_protocol_version(self): uri = VERSION_INFO_URI.format(self.get_endpoint()) version_info_xml = self.fetch_config(uri, None) version_info = VersionInfo(version_info_xml) preferred = version_info.get_preferred() if PROTOCOL_VERSION == preferred: logger.info("Wire protocol version:{0}", PROTOCOL_VERSION) elif PROTOCOL_VERSION in version_info.get_supported(): logger.info("Wire protocol version:{0}", PROTOCOL_VERSION) logger.info("Server preferred version:{0}", preferred) else: error = ("Agent supported wire protocol version: {0} was not " "advised by Fabric.").format(PROTOCOL_VERSION) raise ProtocolNotFoundError(error) def _call_hostplugin_with_container_check(self, host_func): """ Calls host_func on host channel and accounts for stale resource (ResourceGoneError or InvalidContainerError). If stale, it refreshes the goal state and retries host_func. This method can throw, so the callers need to handle that. """ try: ret = host_func() if ret in (None, False): raise Exception("Request failed using the host channel.") return ret except (ResourceGoneError, InvalidContainerError) as error: host_plugin = self.get_host_plugin() old_container_id, old_role_config_name = host_plugin.container_id, host_plugin.role_config_name msg = "[PERIODIC] Request failed with the current host plugin configuration. " \ "ContainerId: {0}, role config file: {1}. Fetching new goal state and retrying the call." \ "Error: {2}".format(old_container_id, old_role_config_name, ustr(error)) logger.periodic_info(logger.EVERY_SIX_HOURS, msg) self.update_host_plugin_from_goal_state() new_container_id, new_role_config_name = host_plugin.container_id, host_plugin.role_config_name msg = "[PERIODIC] Host plugin reconfigured with new parameters. " \ "ContainerId: {0}, role config file: {1}.".format(new_container_id, new_role_config_name) logger.periodic_info(logger.EVERY_SIX_HOURS, msg) try: ret = host_func() if ret in (None, False): raise Exception("Request failed using the host channel after goal state refresh.") msg = "[PERIODIC] Request succeeded using the host plugin channel after goal state refresh. " \ "ContainerId changed from {0} to {1}, " \ "role config file changed from {2} to {3}.".format(old_container_id, new_container_id, old_role_config_name, new_role_config_name) add_periodic(delta=logger.EVERY_SIX_HOURS, name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.HostPlugin, is_success=True, message=msg, log_event=True) return ret except (ResourceGoneError, InvalidContainerError) as error: msg = "[PERIODIC] Request failed using the host plugin channel after goal state refresh. " \ "ContainerId changed from {0} to {1}, role config file changed from {2} to {3}. " \ "Exception type: {4}.".format(old_container_id, new_container_id, old_role_config_name, new_role_config_name, type(error).__name__) add_periodic(delta=logger.EVERY_SIX_HOURS, name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.HostPlugin, is_success=False, message=msg, log_event=True) raise def __send_request_using_host_channel(self, host_func): """ Calls the host_func on host channel with retries for stale goal state and handles any exceptions, consistent with the caller for direct channel. At the time of writing, host_func internally calls either: 1) WireClient.stream which returns a boolean, or 2) WireClient.fetch which returns None or a HTTP response. This method returns either None (failure case where host_func returned None or False), True or an HTTP response. """ ret = None try: ret = self._call_hostplugin_with_container_check(host_func) except Exception as error: logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the host channel. Error: {0}".format(ustr(error))) return ret @staticmethod def __send_request_using_direct_channel(direct_func): """ Calls the direct_func on direct channel and handles any exceptions, consistent with the caller for host channel. At the time of writing, direct_func internally calls either: 1) WireClient.stream which returns a boolean, or 2) WireClient.fetch which returns None or a HTTP response. This method returns either None (failure case where direct_func returned None or False), True or an HTTP response. """ ret = None try: ret = direct_func() if ret in (None, False): logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel.") return None except Exception as error: logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel. Error: {0}".format(ustr(error))) return ret def send_request_using_appropriate_channel(self, direct_func, host_func): """ Determines which communication channel to use. By default, the primary channel is direct, host channel is secondary. We call the primary channel first and return on success. If primary fails, we try secondary. If secondary fails, we return and *don't* switch the default channel. If secondary succeeds, we change the default channel. This method doesn't raise since the calls to direct_func and host_func are already wrapped and handle any exceptions. Possible return values are manifest, artifacts profile, True or None. """ direct_channel = lambda: self.__send_request_using_direct_channel(direct_func) host_channel = lambda: self.__send_request_using_host_channel(host_func) if HostPluginProtocol.is_default_channel: primary_channel, secondary_channel = host_channel, direct_channel else: primary_channel, secondary_channel = direct_channel, host_channel ret = primary_channel() if ret is not None: return ret ret = secondary_channel() if ret is not None: HostPluginProtocol.is_default_channel = not HostPluginProtocol.is_default_channel message = "Default channel changed to {0} channel.".format("HostGA" if HostPluginProtocol.is_default_channel else "direct") logger.info(message) add_event(AGENT_NAME, op=WALAEventOperation.DefaultChannelChange, version=CURRENT_VERSION, is_success=True, message=message, log_event=False) return ret def upload_status_blob(self): extensions_goal_state = self.get_extensions_goal_state() if extensions_goal_state.status_upload_blob is None: # the status upload blob is in ExtensionsConfig so force a full goal state refresh self.update_goal_state(force_update=True) extensions_goal_state = self.get_extensions_goal_state() if extensions_goal_state.status_upload_blob is None: raise ProtocolNotFoundError("Status upload uri is missing") blob_type = extensions_goal_state.status_upload_blob_type try: self.status_blob.prepare(blob_type) except Exception as e: raise ProtocolError("Exception creating status blob: {0}".format(ustr(e))) # Swap the order of use for the HostPlugin vs. the "direct" route. # Prefer the use of HostPlugin. If HostPlugin fails fall back to the # direct route. # # The code previously preferred the "direct" route always, and only fell back # to the HostPlugin *if* there was an error. We would like to move to # the HostPlugin for all traffic, but this is a big change. We would like # to see how this behaves at scale, and have a fallback should things go # wrong. This is why we try HostPlugin then direct. try: host = self.get_host_plugin() host.put_vm_status(self.status_blob, extensions_goal_state.status_upload_blob, extensions_goal_state.status_upload_blob_type) return except ResourceGoneError: # refresh the host plugin client and try again on the next iteration of the main loop self.update_host_plugin_from_goal_state() return except Exception as e: # for all other errors, fall back to direct msg = "Falling back to direct upload: {0}".format(ustr(e)) self.report_status_event(msg, is_success=True) try: if self.status_blob.upload(extensions_goal_state.status_upload_blob): return except Exception as e: msg = "Exception uploading status blob: {0}".format(ustr(e)) self.report_status_event(msg, is_success=False) raise ProtocolError("Failed to upload status blob via either channel") def report_role_prop(self, thumbprint): goal_state = self.get_goal_state() role_prop = _build_role_properties(goal_state.container_id, goal_state.role_instance_id, thumbprint) role_prop = role_prop.encode("utf-8") role_prop_uri = ROLE_PROP_URI.format(self.get_endpoint()) headers = self.get_header_for_xml_content() try: resp = self.call_wireserver(restutil.http_post, role_prop_uri, role_prop, headers=headers) except HttpError as e: raise ProtocolError((u"Failed to send role properties: " u"{0}").format(e)) if resp.status != httpclient.ACCEPTED: raise ProtocolError((u"Failed to send role properties: " u",{0}: {1}").format(resp.status, resp.read())) def report_health(self, status, substatus, description): goal_state = self.get_goal_state() health_report = _build_health_report(goal_state.incarnation, goal_state.container_id, goal_state.role_instance_id, status, substatus, description) health_report = health_report.encode("utf-8") health_report_uri = HEALTH_REPORT_URI.format(self.get_endpoint()) headers = self.get_header_for_xml_content() try: # 30 retries with 10s sleep gives ~5min for wireserver updates; # this is retried 3 times with 15s sleep before throwing a # ProtocolError, for a total of ~15min. resp = self.call_wireserver(restutil.http_post, health_report_uri, health_report, headers=headers, max_retry=30, retry_delay=15) except HttpError as e: raise ProtocolError((u"Failed to send provision status: " u"{0}").format(e)) if restutil.request_failed(resp): raise ProtocolError((u"Failed to send provision status: " u",{0}: {1}").format(resp.status, resp.read())) def send_encoded_event(self, provider_id, event_str, encoding='utf8'): uri = TELEMETRY_URI.format(self.get_endpoint()) data_format_header = ustr('<?xml version="1.0"?><TelemetryData version="1.0"><Provider id="{0}">').format( provider_id).encode(encoding) data_format_footer = ustr('</Provider></TelemetryData>').encode(encoding) # Event string should already be encoded by the time it gets here, to avoid double encoding, # dividing it into parts. data = data_format_header + event_str + data_format_footer try: header = self.get_header_for_xml_content() # NOTE: The call to wireserver requests utf-8 encoding in the headers, but the body should not # be encoded: some nodes in the telemetry pipeline do not support utf-8 encoding. resp = self.call_wireserver(restutil.http_post, uri, data, header) except HttpError as e: raise ProtocolError("Failed to send events:{0}".format(e)) if restutil.request_failed(resp): logger.verbose(resp.read()) raise ProtocolError( "Failed to send events:{0}".format(resp.status)) def report_event(self, events_iterator): buf = {} debug_info = CollectOrReportEventDebugInfo(operation=CollectOrReportEventDebugInfo.OP_REPORT) events_per_provider = defaultdict(int) def _send_event(provider_id, debug_info): try: self.send_encoded_event(provider_id, buf[provider_id]) except UnicodeError as uni_error: debug_info.update_unicode_error(uni_error) except Exception as error: debug_info.update_op_error(error) # Group events by providerId for event in events_iterator: try: if event.providerId not in buf: buf[event.providerId] = b"" event_str = event_to_v1_encoded(event) if len(event_str) >= MAX_EVENT_BUFFER_SIZE: # Ignore single events that are too large to send out details_of_event = [ustr(x.name) + ":" + ustr(x.value) for x in event.parameters if x.name in [GuestAgentExtensionEventsSchema.Name, GuestAgentExtensionEventsSchema.Version, GuestAgentExtensionEventsSchema.Operation, GuestAgentExtensionEventsSchema.OperationSuccess]] logger.periodic_warn(logger.EVERY_HALF_HOUR, "Single event too large: {0}, with the length: {1} more than the limit({2})" .format(str(details_of_event), len(event_str), MAX_EVENT_BUFFER_SIZE)) continue # If buffer is full, send out the events in buffer and reset buffer if len(buf[event.providerId] + event_str) >= MAX_EVENT_BUFFER_SIZE: logger.verbose("No of events this request = {0}".format(events_per_provider[event.providerId])) _send_event(event.providerId, debug_info) buf[event.providerId] = b"" events_per_provider[event.providerId] = 0 # Add encoded events to the buffer buf[event.providerId] = buf[event.providerId] + event_str events_per_provider[event.providerId] += 1 except Exception as error: logger.warn("Unexpected error when generating Events:{0}", textutil.format_exception(error)) # Send out all events left in buffer. for provider_id in list(buf.keys()): if buf[provider_id]: logger.verbose("No of events this request = {0}".format(events_per_provider[provider_id])) _send_event(provider_id, debug_info) debug_info.report_debug_info() def report_status_event(self, message, is_success): report_event(op=WALAEventOperation.ReportStatus, is_success=is_success, message=message, log_event=not is_success) def get_header(self): return { "x-ms-agent-name": "WALinuxAgent", "x-ms-version": PROTOCOL_VERSION } def get_header_for_xml_content(self): return { "x-ms-agent-name": "WALinuxAgent", "x-ms-version": PROTOCOL_VERSION, "Content-Type": "text/xml;charset=utf-8" } def get_header_for_cert(self): trans_cert_file = os.path.join(conf.get_lib_dir(), TRANSPORT_CERT_FILE_NAME) content = self.fetch_cache(trans_cert_file) cert = get_bytes_from_pem(content) return { "x-ms-agent-name": "WALinuxAgent", "x-ms-version": PROTOCOL_VERSION, "x-ms-cipher-name": "DES_EDE3_CBC", "x-ms-guest-agent-public-x509-cert": cert } def get_host_plugin(self): if self._host_plugin is None: goal_state = GoalState(self) self._set_host_plugin(HostPluginProtocol(self.get_endpoint(), goal_state.container_id, goal_state.role_config_name)) return self._host_plugin def get_on_hold(self): return self.get_extensions_goal_state().on_hold def upload_logs(self, content): host_func = lambda: self._upload_logs_through_host(content) return self._call_hostplugin_with_container_check(host_func) def _upload_logs_through_host(self, content): host = self.get_host_plugin() return host.put_vm_log(content) class VersionInfo(object): def __init__(self, xml_text): """ Query endpoint server for wire protocol version. Fail if our desired protocol version is not seen. """ logger.verbose("Load Version.xml") self.parse(xml_text) def parse(self, xml_text): xml_doc = parse_doc(xml_text) preferred = find(xml_doc, "Preferred") self.preferred = findtext(preferred, "Version") logger.info("Fabric preferred wire protocol version:{0}", self.preferred) self.supported = [] supported = find(xml_doc, "Supported") supported_version = findall(supported, "Version") for node in supported_version: version = gettext(node) logger.verbose("Fabric supported wire protocol version:{0}", version) self.supported.append(version) def get_preferred(self): return self.preferred def get_supported(self): return self.supported class ExtensionManifest(object): def __init__(self, xml_text): if xml_text is None: raise ValueError("ExtensionManifest is None") logger.verbose("Load ExtensionManifest.xml") self.pkg_list = ExtHandlerPackageList() self._parse(xml_text) def _parse(self, xml_text): xml_doc = parse_doc(xml_text) self._handle_packages(findall(find(xml_doc, "Plugins"), "Plugin"), False) self._handle_packages(findall(find(xml_doc, "InternalPlugins"), "Plugin"), True) def _handle_packages(self, packages, isinternal): for package in packages: version = findtext(package, "Version") disallow_major_upgrade = findtext(package, "DisallowMajorVersionUpgrade") if disallow_major_upgrade is None: disallow_major_upgrade = '' disallow_major_upgrade = disallow_major_upgrade.lower() == "true" uris = find(package, "Uris") uri_list = findall(uris, "Uri") uri_list = [gettext(x) for x in uri_list] pkg = ExtHandlerPackage() pkg.version = version pkg.disallow_major_upgrade = disallow_major_upgrade for uri in uri_list: pkg.uris.append(uri) pkg.isinternal = isinternal self.pkg_list.versions.append(pkg) # Do not extend this class class InVMArtifactsProfile(object): """ deserialized json string of InVMArtifactsProfile. It is expected to contain the following fields: * inVMArtifactsProfileBlobSeqNo * profileId (optional) * onHold (optional) * certificateThumbprint (optional) * encryptedHealthChecks (optional) * encryptedApplicationProfile (optional) """ def __init__(self, artifacts_profile): if not textutil.is_str_empty(artifacts_profile): self.__dict__.update(parse_json(artifacts_profile)) def is_on_hold(self): # hasattr() is not available in Python 2.6 if 'onHold' in self.__dict__: return str(self.onHold).lower() == 'true' # pylint: disable=E1101 return False class _VmSettingsError(object): ServerError = 'ServerError' ClientError = 'ClientError' Timeout = 'Timeout' RequestFailed = 'RequestFailed' class _VmSettingsErrorReporter(object): _MaxLogErrors = 1 # Max number of errors by period reported to the local log _MaxTelemetryErrors = 3 # Max number of errors by period reported to telemetry _Period = timedelta(hours=1) # How often to report the summary def __init__(self): self._reset() def _reset(self): self._request_count = 0 # Total number of vmSettings HTTP requests self._error_count = 0 # Total number of errors issuing vmSettings requests (includes all kinds of errors) self._server_error_count = 0 # Count of server side errors (HTTP status in the 500s) self._client_error_count = 0 # Count of client side errors (HTTP status in the 400s) self._timeout_count = 0 # Count of timeouts on vmSettings requests self._request_failure_count = 0 # Total count of requests that could not be issued (does not include timeouts or requests that were actually issued and failed, for example, with 500 or 400 statuses) self._next_period = datetime.now() + _VmSettingsErrorReporter._Period def report_request(self): self._request_count += 1 def report_error(self, error, category=None): self._error_count += 1 if self._error_count <= _VmSettingsErrorReporter._MaxLogErrors: logger.info("[VmSettings] [Informational only, the Agent will continue normal operation] {0}", error) if self._error_count <= _VmSettingsErrorReporter._MaxTelemetryErrors: add_event(op=WALAEventOperation.VmSettings, message=error, is_success=False, log_event=False) if category == _VmSettingsError.ServerError: self._server_error_count += 1 elif category == _VmSettingsError.ClientError: self._client_error_count += 1 elif category == _VmSettingsError.Timeout: self._timeout_count += 1 elif category == _VmSettingsError.RequestFailed: self._request_failure_count += 1 def report_summary(self): if datetime.now() >= self._next_period: summary = { "requests": self._request_count, "errors": self._error_count, "serverErrors": self._server_error_count, "clientErrors": self._client_error_count, "timeouts": self._timeout_count, "failedRequests": self._request_failure_count } # always send telemetry, but log errors only message = json.dumps(summary) add_event(op=WALAEventOperation.VmSettingsSummary, message=message, is_success=False, log_event=False) if self._error_count > 0: logger.info("[VmSettingsSummary] {0}", message) self._reset()
[+]
..
[-] __init__.py
[edit]
[-] hostplugin.py
[edit]
[-] ovfenv.py
[edit]
[-] restapi.py
[edit]
[-] util.py
[edit]
[-] wire.py
[edit]
[-] __init__.pyc
[edit]
[-] __init__.pyo
[edit]
[-] hostplugin.pyc
[edit]
[-] hostplugin.pyo
[edit]
[-] extensions_goal_state_from_vm_settings.pyo
[edit]
[-] ovfenv.pyc
[edit]
[-] ovfenv.pyo
[edit]
[-] restapi.pyc
[edit]
[-] restapi.pyo
[edit]
[-] util.pyc
[edit]
[-] util.pyo
[edit]
[-] wire.pyc
[edit]
[-] wire.pyo
[edit]
[-] healthservice.py
[edit]
[-] imds.py
[edit]
[-] healthservice.pyc
[edit]
[-] healthservice.pyo
[edit]
[-] imds.pyc
[edit]
[-] imds.pyo
[edit]
[-] extensions_goal_state.py
[edit]
[-] goal_state.py
[edit]
[-] metadata_server_migration_util.py
[edit]
[-] extensions_goal_state.pyc
[edit]
[-] extensions_goal_state.pyo
[edit]
[-] goal_state.pyc
[edit]
[-] goal_state.pyo
[edit]
[-] metadata_server_migration_util.pyc
[edit]
[-] metadata_server_migration_util.pyo
[edit]
[-] extensions_goal_state_factory.py
[edit]
[-] extensions_goal_state_from_extensions_config.py
[edit]
[-] extensions_goal_state_from_vm_settings.py
[edit]
[-] extensions_goal_state_factory.pyc
[edit]
[-] extensions_goal_state_factory.pyo
[edit]
[-] extensions_goal_state_from_extensions_config.pyc
[edit]
[-] extensions_goal_state_from_extensions_config.pyo
[edit]
[-] extensions_goal_state_from_vm_settings.pyc
[edit]