diff --git a/src/instana/agent/host.py b/src/instana/agent/host.py index b4943cdbb..6c921902e 100644 --- a/src/instana/agent/host.py +++ b/src/instana/agent/host.py @@ -6,25 +6,30 @@ monitoring state and reporting that data. """ -import os import json +import os from datetime import datetime +from typing import Any, Dict, Optional, Union +from requests import Response -import urllib3 import requests +import urllib3 -from ..log import logger -from .base import BaseAgent -from ..fsm import TheMachine -from ..version import VERSION -from ..options import StandardOptions -from ..collector.host import HostCollector -from ..util import to_json -from ..util.runtime import get_py_source +from instana.agent.base import BaseAgent +from instana.collector.host import HostCollector +from instana.configurator import config +from instana.fsm import Discovery, TheMachine +from instana.log import logger +from instana.options import StandardOptions +from instana.util import to_json +from instana.util.config import parse_ignored_endpoints +from instana.util.runtime import get_py_source +from instana.version import VERSION class AnnounceData(object): - """ The Announce Payload """ + """The Announce Payload""" + pid = 0 agentUuid = "" @@ -38,10 +43,11 @@ class HostAgent(BaseAgent): parts it handles are the announce state and the collection and reporting of metrics and spans to the Instana Host agent. """ + AGENT_DISCOVERY_PATH = "com.instana.plugin.python.discovery" AGENT_DATA_PATH = "com.instana.plugin.python.%d" - def __init__(self): + def __init__(self) -> None: super(HostAgent, self).__init__() self.announce_data = None @@ -54,12 +60,14 @@ def __init__(self): # Update log level from what Options detected self.update_log_level() - logger.info("Stan is on the scene. Starting Instana instrumentation version: %s", VERSION) + logger.info( + f"Stan is on the scene. Starting Instana instrumentation version: {VERSION}" + ) self.collector = HostCollector(self) self.machine = TheMachine(self) - def start(self): + def start(self) -> None: """ Starts the agent and required threads @@ -68,14 +76,14 @@ def start(self): logger.debug("Starting Host Collector") self.collector.start() - def handle_fork(self): + def handle_fork(self) -> None: """ Forks happen. Here we handle them. """ # Reset the Agent self.reset() - def reset(self): + def reset(self) -> None: """ This will reset the agent to a fresh unannounced state. :return: None @@ -87,7 +95,7 @@ def reset(self): # Will schedule a restart of the announce cycle in the future self.machine.reset() - def is_timed_out(self): + def is_timed_out(self) -> bool: """ If we haven't heard from the Instana host agent in 60 seconds, this method will return True. @@ -99,7 +107,7 @@ def is_timed_out(self): return True return False - def can_send(self): + def can_send(self) -> bool: """ Are we in a state where we can send data? @return: Boolean @@ -117,73 +125,103 @@ def can_send(self): return False - def set_from(self, res_data): + def set_from( + self, + res_data: Dict[str, Any], + ) -> None: """ Sets the source identifiers given to use by the Instana Host agent. @param res_data: source identifiers provided as announce response @return: None """ if "secrets" in res_data: - self.options.secrets_matcher = res_data['secrets']['matcher'] - self.options.secrets_list = res_data['secrets']['list'] + self.options.secrets_matcher = res_data["secrets"]["matcher"] + self.options.secrets_list = res_data["secrets"]["list"] if "extraHeaders" in res_data: if self.options.extra_http_headers is None: - self.options.extra_http_headers = res_data['extraHeaders'] + self.options.extra_http_headers = res_data["extraHeaders"] else: - self.options.extra_http_headers.extend(res_data['extraHeaders']) - logger.info("Will also capture these custom headers: %s", self.options.extra_http_headers) - - self.announce_data = AnnounceData(pid=res_data['pid'], agentUuid=res_data['agentUuid']) - - def get_from_structure(self): + self.options.extra_http_headers.extend(res_data["extraHeaders"]) + logger.info( + f"Will also capture these custom headers: {self.options.extra_http_headers}" + ) + + if "tracing" in res_data: + if ( + "ignore-endpoints" in res_data["tracing"] + and "INSTANA_IGNORE_ENDPOINTS" not in os.environ + and "tracing" not in config + ): + self.options.ignore_endpoints = parse_ignored_endpoints( + res_data["tracing"]["ignore-endpoints"] + ) + + self.announce_data = AnnounceData( + pid=res_data["pid"], + agentUuid=res_data["agentUuid"], + ) + + def get_from_structure(self) -> Dict[str, str]: """ Retrieves the From data that is reported alongside monitoring data. @return: dict() """ - return {'e': self.announce_data.pid, 'h': self.announce_data.agentUuid} + return {"e": self.announce_data.pid, "h": self.announce_data.agentUuid} - def is_agent_listening(self, host, port): + def is_agent_listening( + self, + host: str, + port: Union[str, int], + ) -> bool: """ Check if the Instana Agent is listening on and . @return: Boolean """ result = False try: - url = "http://%s:%s/" % (host, port) + url = f"http://{host}:{port}/" response = self.client.get(url, timeout=5) if 200 <= response.status_code < 300: - logger.debug("Instana host agent found on %s:%d", host, port) + logger.debug(f"Instana host agent found on {host}:{port}") result = True else: - logger.debug("The attempt to connect to the Instana host "\ - "agent on %s:%d has failed with an unexpected " \ - "status code. Expected HTTP 200 but received: %d", - host, port, response.status_code) + logger.debug( + "The attempt to connect to the Instana host " + f"agent on {host}:{port} has failed with an unexpected " + f"status code. Expected HTTP 200 but received: {response.status_code}" + ) except Exception: - logger.debug("Instana Host Agent not found on %s:%d", host, port) + logger.debug(f"Instana Host Agent not found on {host}:{port}") return result - def announce(self, discovery): + def announce( + self, + discovery: Discovery, + ) -> Optional[Dict[str, Any]]: """ With the passed in Discovery class, attempt to announce to the host agent. """ try: url = self.__discovery_url() - response = self.client.put(url, - data=to_json(discovery), - headers={"Content-Type": "application/json"}, - timeout=0.8) + response = self.client.put( + url, + data=to_json(discovery), + headers={"Content-Type": "application/json"}, + timeout=0.8, + ) except Exception as exc: - logger.debug("announce: connection error (%s)", type(exc)) + logger.debug(f"announce: connection error ({type(exc)})") return None if 200 <= response.status_code <= 204: self.last_seen = datetime.now() if response.status_code != 200: - logger.debug("announce: response status code (%s) is NOT 200", response.status_code) + logger.debug( + f"announce: response status code ({response.status_code}) is NOT 200" + ) return None if isinstance(response.content, bytes): @@ -193,25 +231,28 @@ def announce(self, discovery): try: payload = json.loads(raw_json) - except json.JSONDecodeError as e: - logger.debug("announce: response is not JSON: (%s)", raw_json) + except json.JSONDecodeError: + logger.debug(f"announce: response is not JSON: ({raw_json})") return None - if not hasattr(payload, 'get'): - logger.debug("announce: response payload has no fields: (%s)", payload) + if not hasattr(payload, "get"): + logger.debug(f"announce: response payload has no fields: ({payload})") return None - if not payload.get('pid'): - logger.debug("announce: response payload has no pid: (%s)", payload) + if not payload.get("pid"): + logger.debug(f"announce: response payload has no pid: ({payload})") return None - if not payload.get('agentUuid'): - logger.debug("announce: response payload has no agentUuid: (%s)", payload) + if not payload.get("agentUuid"): + logger.debug(f"announce: response payload has no agentUuid: ({payload})") return None return payload - def log_message_to_host_agent(self, message): + def log_message_to_host_agent( + self, + message: str, + ) -> Optional[Response]: """ Log a message to the discovered host agent """ @@ -221,18 +262,19 @@ def log_message_to_host_agent(self, message): payload["m"] = message url = self.__agent_logger_url() - response = self.client.post(url, - data=to_json(payload), - headers={"Content-Type": "application/json", - "X-Log-Level": "INFO"}, - timeout=0.8) + response = self.client.post( + url, + data=to_json(payload), + headers={"Content-Type": "application/json", "X-Log-Level": "INFO"}, + timeout=0.8, + ) if 200 <= response.status_code <= 204: self.last_seen = datetime.now() except Exception as exc: - logger.debug("agent logging: connection error (%s)", type(exc)) + logger.debug(f"agent logging: connection error ({type(exc)})") - def is_agent_ready(self): + def is_agent_ready(self) -> bool: """ Used after making a successful announce to test when the agent is ready to accept data. """ @@ -243,47 +285,56 @@ def is_agent_ready(self): if response.status_code == 200: ready = True except Exception as exc: - logger.debug("is_agent_ready: connection error (%s)", type(exc)) + logger.debug(f"is_agent_ready: connection error ({type(exc)})") return ready - def report_data_payload(self, payload): + def report_data_payload( + self, + payload: Dict[str, Any], + ) -> Optional[Response]: """ Used to report collection payload to the host agent. This can be metrics, spans and snapshot data. """ response = None try: # Report spans (if any) - span_count = len(payload['spans']) + span_count = len(payload["spans"]) if span_count > 0: - logger.debug("Reporting %d spans", span_count) - response = self.client.post(self.__traces_url(), - data=to_json(payload['spans']), - headers={"Content-Type": "application/json"}, - timeout=0.8) + logger.debug(f"Reporting {span_count} spans") + response = self.client.post( + self.__traces_url(), + data=to_json(payload["spans"]), + headers={"Content-Type": "application/json"}, + timeout=0.8, + ) if response is not None and 200 <= response.status_code <= 204: self.last_seen = datetime.now() # Report profiles (if any) - profile_count = len(payload['profiles']) + profile_count = len(payload["profiles"]) if profile_count > 0: - logger.debug("Reporting %d profiles", profile_count) - response = self.client.post(self.__profiles_url(), - data=to_json(payload['profiles']), - headers={"Content-Type": "application/json"}, - timeout=0.8) + logger.debug(f"Reporting {profile_count} profiles") + response = self.client.post( + self.__profiles_url(), + data=to_json(payload["profiles"]), + headers={"Content-Type": "application/json"}, + timeout=0.8, + ) if response is not None and 200 <= response.status_code <= 204: self.last_seen = datetime.now() # Report metrics - metric_count = len(payload['metrics']) + metric_count = len(payload["metrics"]) if metric_count > 0: metric_bundle = payload["metrics"]["plugins"][0]["data"] - response = self.client.post(self.__data_url(), - data=to_json(metric_bundle), - headers={"Content-Type": "application/json"}, - timeout=0.8) + response = self.client.post( + self.__data_url(), + data=to_json(metric_bundle), + headers={"Content-Type": "application/json"}, + timeout=0.8, + ) if response is not None and 200 <= response.status_code <= 204: self.last_seen = datetime.now() @@ -297,73 +348,90 @@ def report_data_payload(self, payload): except urllib3.exceptions.MaxRetryError: pass except Exception as exc: - logger.debug("report_data_payload: Instana host agent connection error (%s)", type(exc), exc_info=True) + logger.debug( + f"report_data_payload: Instana host agent connection error ({type(exc)})", + exc_info=True, + ) return response - def handle_agent_tasks(self, task): + def handle_agent_tasks(self, task: Dict[str, Any]) -> None: """ When request(s) are received by the host agent, it is sent here for handling & processing. """ - logger.debug("Received agent request with messageId: %s", task["messageId"]) + logger.debug(f"Received agent request with messageId: {task['messageId']}") if "action" in task: if task["action"] == "python.source": payload = get_py_source(task["args"]["file"]) else: - message = "Unrecognized action: %s. An newer Instana package may be required " \ - "for this. Current version: %s" % (task["action"], VERSION) + message = ( + f"Unrecognized action: {task['action']}. An newer Instana package may be required " + f"for this. Current version: {VERSION}" + ) payload = {"error": message} else: payload = {"error": "Instana Python: No action specified in request."} self.__task_response(task["messageId"], payload) - - def diagnostics(self): + def diagnostics(self) -> None: """ Helper function to dump out state. """ try: import threading + dt_format = "%Y-%m-%d %H:%M:%S" logger.warning("====> Instana Python Language Agent Diagnostics <====") logger.warning("----> Agent <----") - logger.warning("is_agent_ready: %s", self.is_agent_ready()) - logger.warning("is_timed_out: %s", self.is_timed_out()) + logger.warning(f"is_agent_ready: {self.is_agent_ready()}") + logger.warning(f"is_timed_out: {self.is_timed_out()}") if self.last_seen is None: logger.warning("last_seen: None") else: - logger.warning("last_seen: %s", self.last_seen.strftime(dt_format)) + logger.warning(f"last_seen: {self.last_seen.strftime(dt_format)}") if self.announce_data is not None: - logger.warning("announce_data: %s", self.announce_data.__dict__) + logger.warning(f"announce_data: {self.announce_data.__dict__}") else: logger.warning("announce_data: None") - logger.warning("Options: %s", self.options.__dict__) + logger.warning(f"Options: {self.options.__dict__}") logger.warning("----> StateMachine <----") - logger.warning("State: %s", self.machine.fsm.current) + logger.warning(f"State: {self.machine.fsm.current}") logger.warning("----> Collector <----") - logger.warning("Collector: %s", self.collector) - logger.warning("is_collector_thread_running?: %s", self.collector.is_reporting_thread_running()) - logger.warning("background_report_lock.locked?: %s", self.collector.background_report_lock.locked()) - logger.warning("ready_to_start: %s", self.collector.ready_to_start) - logger.warning("reporting_thread: %s", self.collector.reporting_thread) - logger.warning("report_interval: %s", self.collector.report_interval) - logger.warning("should_send_snapshot_data: %s", self.collector.should_send_snapshot_data()) - logger.warning("spans in queue: %s", self.collector.span_queue.qsize()) - logger.warning("thread_shutdown is_set: %s", self.collector.thread_shutdown.is_set()) + logger.warning(f"Collector: {self.collector}") + logger.warning( + f"is_collector_thread_running?: {self.collector.is_reporting_thread_running()}" + ) + logger.warning( + f"background_report_lock.locked?: {self.collector.background_report_lock.locked()}" + ) + logger.warning(f"ready_to_start: {self.collector.ready_to_start}") + logger.warning(f"reporting_thread: {self.collector.reporting_thread}") + logger.warning(f"report_interval: {self.collector.report_interval}") + logger.warning( + f"should_send_snapshot_data: {self.collector.should_send_snapshot_data()}" + ) + logger.warning(f"spans in queue: {self.collector.span_queue.qsize()}") + logger.warning( + f"thread_shutdown is_set: {self.collector.thread_shutdown.is_set()}" + ) logger.warning("----> Threads <----") - logger.warning("Threads: %s", threading.enumerate()) + logger.warning(f"Threads: {threading.enumerate()}") except Exception: logger.warning("Non-fatal diagnostics exception: ", exc_info=True) - def __task_response(self, message_id, data): + def __task_response( + self, + message_id: str, + data: Dict[str, Any], + ) -> Optional[Response]: """ When the host agent passes us a task and we do it, this function is used to respond with the results of the task. @@ -372,52 +440,58 @@ def __task_response(self, message_id, data): try: payload = json.dumps(data) - logger.debug("Task response is %s: %s", self.__response_url(message_id), payload) + logger.debug( + f"Task response is {self.__response_url(message_id)}: {payload}" + ) - response = self.client.post(self.__response_url(message_id), - data=payload, - headers={"Content-Type": "application/json"}, - timeout=0.8) + response = self.client.post( + self.__response_url(message_id), + data=payload, + headers={"Content-Type": "application/json"}, + timeout=0.8, + ) except Exception as exc: - logger.debug("__task_response: Instana host agent connection error (%s)", type(exc)) + logger.debug( + f"__task_response: Instana host agent connection error ({type(exc)})" + ) return response - def __discovery_url(self): + def __discovery_url(self) -> str: """ URL for announcing to the host agent """ - return "http://%s:%s/%s" % (self.options.agent_host, self.options.agent_port, self.AGENT_DISCOVERY_PATH) + return f"http://{self.options.agent_host}:{self.options.agent_port}/{self.AGENT_DISCOVERY_PATH}" - def __data_url(self): + def __data_url(self) -> str: """ URL for posting metrics to the host agent. Only valid when announced. """ path = self.AGENT_DATA_PATH % self.announce_data.pid - return "http://%s:%s/%s" % (self.options.agent_host, self.options.agent_port, path) + return f"http://{self.options.agent_host}:{self.options.agent_port}/{path}" - def __traces_url(self): + def __traces_url(self) -> str: """ URL for posting traces to the host agent. Only valid when announced. """ - path = "com.instana.plugin.python/traces.%d" % self.announce_data.pid - return "http://%s:%s/%s" % (self.options.agent_host, self.options.agent_port, path) + path = f"com.instana.plugin.python/traces.{self.announce_data.pid}" + return f"http://{self.options.agent_host}:{self.options.agent_port}/{path}" - def __profiles_url(self): + def __profiles_url(self) -> str: """ URL for posting profiles to the host agent. Only valid when announced. """ - path = "com.instana.plugin.python/profiles.%d" % self.announce_data.pid - return "http://%s:%s/%s" % (self.options.agent_host, self.options.agent_port, path) + path = f"com.instana.plugin.python/profiles.{self.announce_data.pid}" + return f"http://{self.options.agent_host}:{self.options.agent_port}/{path}" - def __response_url(self, message_id): + def __response_url(self, message_id: str) -> str: """ URL for responding to agent requests. """ - path = "com.instana.plugin.python/response.%d?messageId=%s" % (int(self.announce_data.pid), message_id) - return "http://%s:%s/%s" % (self.options.agent_host, self.options.agent_port, path) + path = f"com.instana.plugin.python/response.{int(self.announce_data.pid)}?messageId={message_id}" + return f"http://{self.options.agent_host}:{self.options.agent_port}/{path}" - def __agent_logger_url(self): + def __agent_logger_url(self) -> str: """ URL for logging messages to the discovered host agent. """ - return "http://%s:%s/%s" % (self.options.agent_host, self.options.agent_port, "com.instana.agent.logger") + return f"http://{self.options.agent_host}:{self.options.agent_port}/com.instana.agent.logger" diff --git a/src/instana/collector/helpers/runtime.py b/src/instana/collector/helpers/runtime.py index 8aef48e35..7ccf6d8ab 100644 --- a/src/instana/collector/helpers/runtime.py +++ b/src/instana/collector/helpers/runtime.py @@ -1,7 +1,8 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2020 -""" Collection helper for the Python runtime """ +"""Collection helper for the Python runtime""" + import gc import importlib.metadata import os @@ -10,29 +11,35 @@ import sys import threading from types import ModuleType +from typing import Any, Dict, List, Union, Callable from instana.collector.helpers.base import BaseHelper from instana.log import logger from instana.util import DictionaryOfStan from instana.util.runtime import determine_service_name from instana.version import VERSION +from instana.collector.base import BaseCollector PATH_OF_DEPRECATED_INSTALLATION_VIA_HOST_AGENT = "/tmp/.instana/python" -PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR = '/opt/instana/instrumentation/python/' +PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR = "/opt/instana/instrumentation/python/" + -def is_autowrapt_instrumented(): - return 'instana' in os.environ.get('AUTOWRAPT_BOOTSTRAP', ()) +def is_autowrapt_instrumented() -> bool: + return "instana" in os.environ.get("AUTOWRAPT_BOOTSTRAP", ()) -def is_webhook_instrumented(): +def is_webhook_instrumented() -> bool: return any(map(lambda p: PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR in p, sys.path)) class RuntimeHelper(BaseHelper): """Helper class to collect snapshot and metrics for this Python runtime""" - def __init__(self, collector): + def __init__( + self, + collector: BaseCollector, + ) -> None: super(RuntimeHelper, self).__init__(collector) self.previous = DictionaryOfStan() self.previous_rusage = resource.getrusage(resource.RUSAGE_SELF) @@ -42,7 +49,7 @@ def __init__(self, collector): else: self.previous_gc_count = None - def collect_metrics(self, **kwargs): + def collect_metrics(self, **kwargs: Dict[str, Any]) -> List[Dict[str, Any]]: plugin_data = dict() try: plugin_data["name"] = "com.instana.plugin.python" @@ -66,7 +73,11 @@ def collect_metrics(self, **kwargs): logger.debug("_collect_metrics: ", exc_info=True) return [plugin_data] - def _collect_runtime_metrics(self, plugin_data, with_snapshot): + def _collect_runtime_metrics( + self, + plugin_data: Dict[str, Any], + with_snapshot: bool, + ) -> None: if os.environ.get("INSTANA_DISABLE_METRICS_COLLECTION", False): return @@ -270,7 +281,11 @@ def _collect_gc_metrics(self, plugin_data, with_snapshot): except Exception: logger.debug("_collect_gc_metrics", exc_info=True) - def _collect_thread_metrics(self, plugin_data, with_snapshot): + def _collect_thread_metrics( + self, + plugin_data: Dict[str, Any], + with_snapshot: bool, + ) -> None: try: threads = threading.enumerate() daemon_threads = [thread.daemon is True for thread in threads].count(True) @@ -304,7 +319,10 @@ def _collect_thread_metrics(self, plugin_data, with_snapshot): except Exception: logger.debug("_collect_thread_metrics", exc_info=True) - def _collect_runtime_snapshot(self, plugin_data): + def _collect_runtime_snapshot( + self, + plugin_data: Dict[str, Any], + ) -> None: """Gathers Python specific Snapshot information for this process""" snapshot_payload = {} try: @@ -316,9 +334,9 @@ def _collect_runtime_snapshot(self, plugin_data): snapshot_payload["iv"] = VERSION if is_autowrapt_instrumented(): - snapshot_payload['m'] = 'Autowrapt' + snapshot_payload["m"] = "Autowrapt" elif is_webhook_instrumented(): - snapshot_payload['m'] = 'AutoTrace' + snapshot_payload["m"] = "AutoTrace" else: snapshot_payload["m"] = "Manual" @@ -341,7 +359,7 @@ def _collect_runtime_snapshot(self, plugin_data): plugin_data["data"]["snapshot"] = snapshot_payload - def gather_python_packages(self): + def gather_python_packages(self) -> None: """Collect up the list of modules in use""" if os.environ.get("INSTANA_DISABLE_PYTHON_PACKAGE_COLLECTION"): return {"instana": VERSION} @@ -378,8 +396,7 @@ def gather_python_packages(self): pass except Exception: logger.debug( - "gather_python_packages: could not process module: %s", - pkg_name, + f"gather_python_packages: could not process module: {pkg_name}", ) # Manually set our package version @@ -389,7 +406,10 @@ def gather_python_packages(self): return versions - def jsonable(self, value): + def jsonable( + self, + value: Union[Callable[[], Any], ModuleType, Any], + ) -> str: try: if callable(value): try: diff --git a/src/instana/instrumentation/redis.py b/src/instana/instrumentation/redis.py index 621bca266..1c2eb0d0d 100644 --- a/src/instana/instrumentation/redis.py +++ b/src/instana/instrumentation/redis.py @@ -7,7 +7,11 @@ from instana.log import logger from instana.span.span import InstanaSpan -from instana.util.traceutils import get_tracer_tuple, tracing_is_off +from instana.util.traceutils import ( + is_service_or_endpoint_ignored, + get_tracer_tuple, + tracing_is_off, +) try: import redis @@ -43,6 +47,8 @@ def execute_command_with_instana( args: Tuple[object, ...], kwargs: Dict[str, Any], ) -> object: + if is_service_or_endpoint_ignored("redis", args[0]): + return tracer, parent_span, operation_name = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None diff --git a/src/instana/options.py b/src/instana/options.py index 0f90e62b3..9ea843cd7 100644 --- a/src/instana/options.py +++ b/src/instana/options.py @@ -13,57 +13,80 @@ - AWSFargateOptions - Options class for AWS Fargate. Holds settings specific to AWS Fargate. - GCROptions - Options class for Google cloud Run. Holds settings specific to GCR. """ + import os import logging +from typing import Any, Dict -from .log import logger -from .util.runtime import determine_service_name +from instana.log import logger +from instana.util.config import parse_ignored_endpoints +from instana.util.runtime import determine_service_name +from instana.configurator import config class BaseOptions(object): - """ Base class for all option classes. Holds items common to all """ + """Base class for all option classes. Holds items common to all""" - def __init__(self, **kwds): + def __init__(self, **kwds: Dict[str, Any]) -> None: self.debug = False self.log_level = logging.WARN self.service_name = determine_service_name() self.extra_http_headers = None self.allow_exit_as_root = False + self.ignore_endpoints = {} if "INSTANA_DEBUG" in os.environ: self.log_level = logging.DEBUG self.debug = True if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ: - self.extra_http_headers = str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(';') - - if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == '1': + self.extra_http_headers = ( + str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";") + ) + + if "INSTANA_IGNORE_ENDPOINTS" in os.environ: + self.ignore_endpoints = parse_ignored_endpoints( + os.environ["INSTANA_IGNORE_ENDPOINTS"] + ) + else: + if ( + isinstance(config.get("tracing"), dict) + and "ignore_endpoints" in config["tracing"] + ): + self.ignore_endpoints = parse_ignored_endpoints( + config["tracing"]["ignore_endpoints"], + ) + + if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1": self.allow_exit_as_root = True # Defaults - self.secrets_matcher = 'contains-ignore-case' - self.secrets_list = ['key', 'pass', 'secret'] + self.secrets_matcher = "contains-ignore-case" + self.secrets_list = ["key", "pass", "secret"] # Env var format: :[,] self.secrets = os.environ.get("INSTANA_SECRETS", None) if self.secrets is not None: - parts = self.secrets.split(':') + parts = self.secrets.split(":") if len(parts) == 2: self.secrets_matcher = parts[0] - self.secrets_list = parts[1].split(',') + self.secrets_list = parts[1].split(",") else: - logger.warning("Couldn't parse INSTANA_SECRETS env var: %s", self.secrets) + logger.warning( + f"Couldn't parse INSTANA_SECRETS env var: {self.secrets}" + ) self.__dict__.update(kwds) class StandardOptions(BaseOptions): - """ The options class used when running directly on a host/node with an Instana agent """ + """The options class used when running directly on a host/node with an Instana agent""" + AGENT_DEFAULT_HOST = "localhost" AGENT_DEFAULT_PORT = 42699 - def __init__(self, **kwds): + def __init__(self, **kwds: Dict[str, Any]) -> None: super(StandardOptions, self).__init__() self.agent_host = os.environ.get("INSTANA_AGENT_HOST", self.AGENT_DEFAULT_HOST) @@ -74,9 +97,9 @@ def __init__(self, **kwds): class ServerlessOptions(BaseOptions): - """ Base class for serverless environments. Holds settings common to all serverless environments. """ + """Base class for serverless environments. Holds settings common to all serverless environments.""" - def __init__(self, **kwds): + def __init__(self, **kwds: Dict[str, Any]) -> None: super(ServerlessOptions, self).__init__() self.agent_key = os.environ.get("INSTANA_AGENT_KEY", None) @@ -86,7 +109,7 @@ def __init__(self, **kwds): if self.endpoint_url is not None and self.endpoint_url[-1] == "/": self.endpoint_url = self.endpoint_url[:-1] - if 'INSTANA_DISABLE_CA_CHECK' in os.environ: + if "INSTANA_DISABLE_CA_CHECK" in os.environ: self.ssl_verify = False else: self.ssl_verify = True @@ -95,7 +118,7 @@ def __init__(self, **kwds): if proxy is None: self.endpoint_proxy = {} else: - self.endpoint_proxy = {'https': proxy} + self.endpoint_proxy = {"https": proxy} timeout_in_ms = os.environ.get("INSTANA_TIMEOUT", None) if timeout_in_ms is None: @@ -105,9 +128,13 @@ def __init__(self, **kwds): try: self.timeout = int(timeout_in_ms) / 1000 except ValueError: - logger.warning("Likely invalid INSTANA_TIMEOUT=%s value. Using default.", timeout_in_ms) - logger.warning("INSTANA_TIMEOUT should specify timeout in milliseconds. See " - "https://www.instana.com/docs/reference/environment_variables/#serverless-monitoring") + logger.warning( + f"Likely invalid INSTANA_TIMEOUT={timeout_in_ms} value. Using default." + ) + logger.warning( + "INSTANA_TIMEOUT should specify timeout in milliseconds. See " + "https://www.instana.com/docs/reference/environment_variables/#serverless-monitoring" + ) self.timeout = 0.8 value = os.environ.get("INSTANA_LOG_LEVEL", None) @@ -123,22 +150,22 @@ def __init__(self, **kwds): elif value == "error": self.log_level = logging.ERROR else: - logger.warning("Unknown INSTANA_LOG_LEVEL specified: %s", value) + logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}") except Exception: logger.debug("BaseAgent.update_log_level: ", exc_info=True) class AWSLambdaOptions(ServerlessOptions): - """ Options class for AWS Lambda. Holds settings specific to AWS Lambda. """ + """Options class for AWS Lambda. Holds settings specific to AWS Lambda.""" - def __init__(self, **kwds): + def __init__(self, **kwds: Dict[str, Any]) -> None: super(AWSLambdaOptions, self).__init__() class AWSFargateOptions(ServerlessOptions): - """ Options class for AWS Fargate. Holds settings specific to AWS Fargate. """ + """Options class for AWS Fargate. Holds settings specific to AWS Fargate.""" - def __init__(self, **kwds): + def __init__(self, **kwds: Dict[str, Any]) -> None: super(AWSFargateOptions, self).__init__() self.tags = None @@ -146,26 +173,29 @@ def __init__(self, **kwds): if tag_list is not None: try: self.tags = dict() - tags = tag_list.split(',') + tags = tag_list.split(",") for tag_and_value in tags: - parts = tag_and_value.split('=') + parts = tag_and_value.split("=") length = len(parts) if length == 1: self.tags[parts[0]] = None elif length == 2: self.tags[parts[0]] = parts[1] except Exception: - logger.debug("Error parsing INSTANA_TAGS env var: %s", tag_list) + logger.debug(f"Error parsing INSTANA_TAGS env var: {tag_list}") self.zone = os.environ.get("INSTANA_ZONE", None) + class EKSFargateOptions(AWSFargateOptions): - """ Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate. """ - def __init__(self, **kwds): + """Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate.""" + + def __init__(self, **kwds: Dict[str, Any]) -> None: super(EKSFargateOptions, self).__init__() + class GCROptions(ServerlessOptions): - """ Options class for Google Cloud Run. Holds settings specific to Google Cloud Run. """ + """Options class for Google Cloud Run. Holds settings specific to Google Cloud Run.""" - def __init__(self, **kwds): + def __init__(self, **kwds: Dict[str, Any]) -> None: super(GCROptions, self).__init__() diff --git a/src/instana/util/config.py b/src/instana/util/config.py new file mode 100644 index 000000000..770c8f4f0 --- /dev/null +++ b/src/instana/util/config.py @@ -0,0 +1,82 @@ +from typing import Any, Dict, List, Union +from instana.log import logger + + +def parse_service_pair(pair: str) -> List[str]: + """ + Parses a pair string to prepare a list of ignored endpoints. + + @param pair: String format: + - "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1" + @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + """ + pair_list = [] + if ":" in pair: + service, endpoints = pair.split(":", 1) + service = service.strip() + endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()] + + for endpoint in endpoint_list: + pair_list.append(f"{service}.{endpoint}") + else: + pair_list.append(pair) + return pair_list + + +def parse_ignored_endpoints_string(params: str) -> List[str]: + """ + Parses a string to prepare a list of ignored endpoints. + + @param params: String format: + - "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2" + @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + """ + ignore_endpoints = [] + if params: + service_pairs = params.split(";") + + for pair in service_pairs: + if pair.strip(): + ignore_endpoints += parse_service_pair(pair) + return ignore_endpoints + + +def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]: + """ + Parses a dictionary to prepare a list of ignored endpoints. + + @param params: Dict format: + - {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]} + @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + """ + ignore_endpoints = [] + + for service, endpoints in params.items(): + if not endpoints: # filtering all service + ignore_endpoints.append(service) + else: # filtering specific endpoints + for endpoint in endpoints: + ignore_endpoints.append(f"{service}.{endpoint}") + + return ignore_endpoints + + +def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: + """ + Parses input to prepare a list for ignored endpoints. + + @param params: Can be either: + - String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2" + - Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]} + @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + """ + try: + if isinstance(params, str): + return parse_ignored_endpoints_string(params) + elif isinstance(params, dict): + return parse_ignored_endpoints_dict(params) + else: + return [] + except Exception as e: + logger.debug("Error parsing ignored endpoints: %s", str(e)) + return [] diff --git a/src/instana/util/traceutils.py b/src/instana/util/traceutils.py index ad944163f..aa1aaf7dd 100644 --- a/src/instana/util/traceutils.py +++ b/src/instana/util/traceutils.py @@ -1,7 +1,16 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2021 -from typing import Optional, Tuple, TYPE_CHECKING, Union, Dict, List, Any, Iterable +from typing import ( + Optional, + Tuple, + TYPE_CHECKING, + Union, + Dict, + List, + Any, + Iterable, +) from instana.log import logger from instana.singletons import agent, tracer @@ -11,7 +20,12 @@ if TYPE_CHECKING: from instana.span.span import InstanaSpan -def extract_custom_headers(span: "InstanaSpan", headers: Optional[Union[Dict[str, Any], List[Tuple[object, ...]], Iterable]] = None, format: Optional[bool] = False) -> None: + +def extract_custom_headers( + span: "InstanaSpan", + headers: Optional[Union[Dict[str, Any], List[Tuple[object, ...]], Iterable]] = None, + format: Optional[bool] = False, +) -> None: if not (agent.options.extra_http_headers and headers): return try: @@ -24,19 +38,31 @@ def extract_custom_headers(span: "InstanaSpan", headers: Optional[Union[Dict[str ) for header in headers: if isinstance(header, tuple): - header_key = header[0].decode("utf-8") if isinstance(header[0], bytes) else header[0] - header_val = header[1].decode("utf-8") if isinstance(header[1], bytes) else header[1] + header_key = ( + header[0].decode("utf-8") + if isinstance(header[0], bytes) + else header[0] + ) + header_val = ( + header[1].decode("utf-8") + if isinstance(header[1], bytes) + else header[1] + ) if header_key.lower() == expected_header.lower(): span.set_attribute( - f"http.header.{custom_header}", header_val, - ) + f"http.header.{custom_header}", + header_val, + ) elif header.lower() == expected_header.lower(): - span.set_attribute(f"http.header.{custom_header}", headers[expected_header]) + span.set_attribute( + f"http.header.{custom_header}", headers[expected_header] + ) except Exception: logger.debug("extract_custom_headers: ", exc_info=True) def get_active_tracer() -> Optional[InstanaTracer]: + """Get the currently active tracer if one exists.""" try: current_span = get_current_span() if current_span: @@ -54,6 +80,7 @@ def get_active_tracer() -> Optional[InstanaTracer]: def get_tracer_tuple() -> ( Tuple[Optional[InstanaTracer], Optional["InstanaSpan"], Optional[str]] ): + """Get a tuple of (tracer, span, span_name) for the current context.""" active_tracer = get_active_tracer() current_span = get_current_span() if active_tracer: @@ -64,4 +91,16 @@ def get_tracer_tuple() -> ( def tracing_is_off() -> bool: + """Check if tracing is currently disabled.""" return not (bool(get_active_tracer()) or agent.options.allow_exit_as_root) + + +def is_service_or_endpoint_ignored( + service: str, + endpoint: str, +) -> bool: + """Check if the given service and endpoint combination should be ignored.""" + return ( + service in agent.options.ignore_endpoints + or f"{service}.{endpoint.lower()}" in agent.options.ignore_endpoints + ) diff --git a/tests/clients/test_redis.py b/tests/clients/test_redis.py index 4fa93e5c0..546deb4a4 100644 --- a/tests/clients/test_redis.py +++ b/tests/clients/test_redis.py @@ -3,14 +3,17 @@ import logging +import os from typing import Generator from unittest.mock import patch + import pytest import redis +from instana.options import StandardOptions +from instana.singletons import agent, tracer from instana.span.span import get_current_span from tests.helpers import testenv -from instana.singletons import agent, tracer class TestRedis: @@ -21,6 +24,8 @@ def _resource(self) -> Generator[None, None, None]: self.recorder.clear_spans() self.client = redis.Redis(host=testenv["redis_host"], db=testenv["redis_db"]) yield + if os.environ.get("INSTANA_IGNORE_ENDPOINTS"): + del os.environ["INSTANA_IGNORE_ENDPOINTS"] agent.options.allow_exit_as_root = False def test_set_get(self) -> None: @@ -454,3 +459,68 @@ def test_execute_with_instana_exception( pipe.get("foox") pipe.execute() assert "Error collecting pipeline commands" in caplog.messages + + def test_ignore_redis(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis" + agent.options = StandardOptions() + + with tracer.start_as_current_span("test"): + self.client.set("foox", "barX") + self.client.get("foox") + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + sdk_span = spans[0] + + assert sdk_span.n == "sdk" + + def test_ignore_redis_single_command(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set" + agent.options = StandardOptions() + with tracer.start_as_current_span("test"): + self.client.set("foox", "barX") + self.client.get("foox") + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + redis_get_span = spans[0] + sdk_span = spans[1] + + assert redis_get_span.n == "redis" + assert redis_get_span.data["redis"]["command"] == "GET" + + assert sdk_span.n == "sdk" + + def test_ignore_redis_multiple_commands(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set,get" + agent.options = StandardOptions() + with tracer.start_as_current_span("test"): + self.client.set("foox", "barX") + self.client.get("foox") + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + sdk_span = spans[0] + + assert sdk_span.n == "sdk" + + def test_ignore_redis_with_another_instrumentation(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set;something_else:something" + agent.options = StandardOptions() + with tracer.start_as_current_span("test"): + self.client.set("foox", "barX") + self.client.get("foox") + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + redis_get_span = spans[0] + sdk_span = spans[1] + + assert redis_get_span.n == "redis" + assert redis_get_span.data["redis"]["command"] == "GET" + + assert sdk_span.n == "sdk" diff --git a/tests/util/test_config.py b/tests/util/test_config.py new file mode 100644 index 000000000..b58172d28 --- /dev/null +++ b/tests/util/test_config.py @@ -0,0 +1,39 @@ +from typing import Generator + +import pytest +from instana.util.config import parse_ignored_endpoints + + +class TestConfig: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + yield + + def test_parse_ignored_endpoints(self) -> None: + test_pair = "service1:endpoint1,endpoint2" + response = parse_ignored_endpoints(test_pair) + assert response == ["service1.endpoint1", "service1.endpoint2"] + + test_pair = "service1;service2" + response = parse_ignored_endpoints(test_pair) + assert response == ["service1", "service2"] + + test_pair = "service1" + response = parse_ignored_endpoints(test_pair) + assert response == ["service1"] + + test_pair = ";" + response = parse_ignored_endpoints(test_pair) + assert response == [] + + test_pair = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + response = parse_ignored_endpoints(test_pair) + assert response == [ + "service1.endpoint1", + "service1.endpoint2", + "service2.endpoint1", + ] + + test_pair = "" + response = parse_ignored_endpoints(test_pair) + assert response == []