Skip to content

Commit

Permalink
redis: added endpoint filtering
Browse files Browse the repository at this point in the history
Signed-off-by: Cagri Yonca <[email protected]>
  • Loading branch information
CagriYonca committed Jan 30, 2025
1 parent a2b70b6 commit 272639c
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 182 deletions.
322 changes: 198 additions & 124 deletions src/instana/agent/host.py

Large diffs are not rendered by default.

52 changes: 36 additions & 16 deletions src/instana/collector/helpers/runtime.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# (c) Copyright IBM Corp. 2021
# (c) Copyright Instana Inc. 2020

""" Collection helper for the Python runtime """
"""Collection helper for the Python runtime"""

import gc
import importlib.metadata
import os
Expand All @@ -10,29 +11,35 @@
import sys
import threading
from types import ModuleType
from typing import Any, Dict, List, Union, Callable

from instana.collector.helpers.base import BaseHelper
from instana.log import logger
from instana.util import DictionaryOfStan
from instana.util.runtime import determine_service_name
from instana.version import VERSION
from instana.collector.base import BaseCollector

PATH_OF_DEPRECATED_INSTALLATION_VIA_HOST_AGENT = "/tmp/.instana/python"

PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR = '/opt/instana/instrumentation/python/'
PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR = "/opt/instana/instrumentation/python/"


def is_autowrapt_instrumented():
return 'instana' in os.environ.get('AUTOWRAPT_BOOTSTRAP', ())
def is_autowrapt_instrumented() -> bool:
return "instana" in os.environ.get("AUTOWRAPT_BOOTSTRAP", ())


def is_webhook_instrumented():
def is_webhook_instrumented() -> bool:
return any(map(lambda p: PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR in p, sys.path))


class RuntimeHelper(BaseHelper):
"""Helper class to collect snapshot and metrics for this Python runtime"""

def __init__(self, collector):
def __init__(
self,
collector: BaseCollector,
) -> None:
super(RuntimeHelper, self).__init__(collector)
self.previous = DictionaryOfStan()
self.previous_rusage = resource.getrusage(resource.RUSAGE_SELF)
Expand All @@ -42,7 +49,7 @@ def __init__(self, collector):
else:
self.previous_gc_count = None

def collect_metrics(self, **kwargs):
def collect_metrics(self, **kwargs: Dict[str, Any]) -> List[Dict[str, Any]]:
plugin_data = dict()
try:
plugin_data["name"] = "com.instana.plugin.python"
Expand All @@ -66,7 +73,11 @@ def collect_metrics(self, **kwargs):
logger.debug("_collect_metrics: ", exc_info=True)
return [plugin_data]

def _collect_runtime_metrics(self, plugin_data, with_snapshot):
def _collect_runtime_metrics(
self,
plugin_data: Dict[str, Any],
with_snapshot: bool,
) -> None:
if os.environ.get("INSTANA_DISABLE_METRICS_COLLECTION", False):
return

Expand Down Expand Up @@ -270,7 +281,11 @@ def _collect_gc_metrics(self, plugin_data, with_snapshot):
except Exception:
logger.debug("_collect_gc_metrics", exc_info=True)

def _collect_thread_metrics(self, plugin_data, with_snapshot):
def _collect_thread_metrics(
self,
plugin_data: Dict[str, Any],
with_snapshot: bool,
) -> None:
try:
threads = threading.enumerate()
daemon_threads = [thread.daemon is True for thread in threads].count(True)
Expand Down Expand Up @@ -304,7 +319,10 @@ def _collect_thread_metrics(self, plugin_data, with_snapshot):
except Exception:
logger.debug("_collect_thread_metrics", exc_info=True)

def _collect_runtime_snapshot(self, plugin_data):
def _collect_runtime_snapshot(
self,
plugin_data: Dict[str, Any],
) -> None:
"""Gathers Python specific Snapshot information for this process"""
snapshot_payload = {}
try:
Expand All @@ -316,9 +334,9 @@ def _collect_runtime_snapshot(self, plugin_data):
snapshot_payload["iv"] = VERSION

if is_autowrapt_instrumented():
snapshot_payload['m'] = 'Autowrapt'
snapshot_payload["m"] = "Autowrapt"
elif is_webhook_instrumented():
snapshot_payload['m'] = 'AutoTrace'
snapshot_payload["m"] = "AutoTrace"
else:
snapshot_payload["m"] = "Manual"

Expand All @@ -341,7 +359,7 @@ def _collect_runtime_snapshot(self, plugin_data):

plugin_data["data"]["snapshot"] = snapshot_payload

def gather_python_packages(self):
def gather_python_packages(self) -> None:
"""Collect up the list of modules in use"""
if os.environ.get("INSTANA_DISABLE_PYTHON_PACKAGE_COLLECTION"):
return {"instana": VERSION}
Expand Down Expand Up @@ -378,8 +396,7 @@ def gather_python_packages(self):
pass
except Exception:
logger.debug(
"gather_python_packages: could not process module: %s",
pkg_name,
f"gather_python_packages: could not process module: {pkg_name}",
)

# Manually set our package version
Expand All @@ -389,7 +406,10 @@ def gather_python_packages(self):

return versions

def jsonable(self, value):
def jsonable(
self,
value: Union[Callable[[], Any], ModuleType, Any],
) -> str:
try:
if callable(value):
try:
Expand Down
8 changes: 7 additions & 1 deletion src/instana/instrumentation/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@

from instana.log import logger
from instana.span.span import InstanaSpan
from instana.util.traceutils import get_tracer_tuple, tracing_is_off
from instana.util.traceutils import (
is_service_or_endpoint_ignored,
get_tracer_tuple,
tracing_is_off,
)

try:
import redis
Expand Down Expand Up @@ -43,6 +47,8 @@ def execute_command_with_instana(
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
if is_service_or_endpoint_ignored("redis", args[0]):
return
tracer, parent_span, operation_name = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

Expand Down
96 changes: 63 additions & 33 deletions src/instana/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,80 @@
- AWSFargateOptions - Options class for AWS Fargate. Holds settings specific to AWS Fargate.
- GCROptions - Options class for Google cloud Run. Holds settings specific to GCR.
"""

import os
import logging
from typing import Any, Dict

from .log import logger
from .util.runtime import determine_service_name
from instana.log import logger
from instana.util.config import parse_ignored_endpoints
from instana.util.runtime import determine_service_name
from instana.configurator import config


class BaseOptions(object):
""" Base class for all option classes. Holds items common to all """
"""Base class for all option classes. Holds items common to all"""

def __init__(self, **kwds):
def __init__(self, **kwds: Dict[str, Any]) -> None:
self.debug = False
self.log_level = logging.WARN
self.service_name = determine_service_name()
self.extra_http_headers = None
self.allow_exit_as_root = False
self.ignore_endpoints = {}

if "INSTANA_DEBUG" in os.environ:
self.log_level = logging.DEBUG
self.debug = True

if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ:
self.extra_http_headers = str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(';')

if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == '1':
self.extra_http_headers = (
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
)

if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints(
os.environ["INSTANA_IGNORE_ENDPOINTS"]
)
else:
if (
isinstance(config.get("tracing"), dict)
and "ignore_endpoints" in config["tracing"]
):
self.ignore_endpoints = parse_ignored_endpoints(
config["tracing"]["ignore_endpoints"],
)

if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
self.allow_exit_as_root = True

# Defaults
self.secrets_matcher = 'contains-ignore-case'
self.secrets_list = ['key', 'pass', 'secret']
self.secrets_matcher = "contains-ignore-case"
self.secrets_list = ["key", "pass", "secret"]

# Env var format: <matcher>:<secret>[,<secret>]
self.secrets = os.environ.get("INSTANA_SECRETS", None)

if self.secrets is not None:
parts = self.secrets.split(':')
parts = self.secrets.split(":")
if len(parts) == 2:
self.secrets_matcher = parts[0]
self.secrets_list = parts[1].split(',')
self.secrets_list = parts[1].split(",")
else:
logger.warning("Couldn't parse INSTANA_SECRETS env var: %s", self.secrets)
logger.warning(
f"Couldn't parse INSTANA_SECRETS env var: {self.secrets}"
)

self.__dict__.update(kwds)


class StandardOptions(BaseOptions):
""" The options class used when running directly on a host/node with an Instana agent """
"""The options class used when running directly on a host/node with an Instana agent"""

AGENT_DEFAULT_HOST = "localhost"
AGENT_DEFAULT_PORT = 42699

def __init__(self, **kwds):
def __init__(self, **kwds: Dict[str, Any]) -> None:
super(StandardOptions, self).__init__()

self.agent_host = os.environ.get("INSTANA_AGENT_HOST", self.AGENT_DEFAULT_HOST)
Expand All @@ -74,9 +97,9 @@ def __init__(self, **kwds):


class ServerlessOptions(BaseOptions):
""" Base class for serverless environments. Holds settings common to all serverless environments. """
"""Base class for serverless environments. Holds settings common to all serverless environments."""

def __init__(self, **kwds):
def __init__(self, **kwds: Dict[str, Any]) -> None:
super(ServerlessOptions, self).__init__()

self.agent_key = os.environ.get("INSTANA_AGENT_KEY", None)
Expand All @@ -86,7 +109,7 @@ def __init__(self, **kwds):
if self.endpoint_url is not None and self.endpoint_url[-1] == "/":
self.endpoint_url = self.endpoint_url[:-1]

if 'INSTANA_DISABLE_CA_CHECK' in os.environ:
if "INSTANA_DISABLE_CA_CHECK" in os.environ:
self.ssl_verify = False
else:
self.ssl_verify = True
Expand All @@ -95,7 +118,7 @@ def __init__(self, **kwds):
if proxy is None:
self.endpoint_proxy = {}
else:
self.endpoint_proxy = {'https': proxy}
self.endpoint_proxy = {"https": proxy}

timeout_in_ms = os.environ.get("INSTANA_TIMEOUT", None)
if timeout_in_ms is None:
Expand All @@ -105,9 +128,13 @@ def __init__(self, **kwds):
try:
self.timeout = int(timeout_in_ms) / 1000
except ValueError:
logger.warning("Likely invalid INSTANA_TIMEOUT=%s value. Using default.", timeout_in_ms)
logger.warning("INSTANA_TIMEOUT should specify timeout in milliseconds. See "
"https://www.instana.com/docs/reference/environment_variables/#serverless-monitoring")
logger.warning(
f"Likely invalid INSTANA_TIMEOUT={timeout_in_ms} value. Using default."
)
logger.warning(
"INSTANA_TIMEOUT should specify timeout in milliseconds. See "
"https://www.instana.com/docs/reference/environment_variables/#serverless-monitoring"
)
self.timeout = 0.8

value = os.environ.get("INSTANA_LOG_LEVEL", None)
Expand All @@ -123,49 +150,52 @@ def __init__(self, **kwds):
elif value == "error":
self.log_level = logging.ERROR
else:
logger.warning("Unknown INSTANA_LOG_LEVEL specified: %s", value)
logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}")
except Exception:
logger.debug("BaseAgent.update_log_level: ", exc_info=True)


class AWSLambdaOptions(ServerlessOptions):
""" Options class for AWS Lambda. Holds settings specific to AWS Lambda. """
"""Options class for AWS Lambda. Holds settings specific to AWS Lambda."""

def __init__(self, **kwds):
def __init__(self, **kwds: Dict[str, Any]) -> None:
super(AWSLambdaOptions, self).__init__()


class AWSFargateOptions(ServerlessOptions):
""" Options class for AWS Fargate. Holds settings specific to AWS Fargate. """
"""Options class for AWS Fargate. Holds settings specific to AWS Fargate."""

def __init__(self, **kwds):
def __init__(self, **kwds: Dict[str, Any]) -> None:
super(AWSFargateOptions, self).__init__()

self.tags = None
tag_list = os.environ.get("INSTANA_TAGS", None)
if tag_list is not None:
try:
self.tags = dict()
tags = tag_list.split(',')
tags = tag_list.split(",")
for tag_and_value in tags:
parts = tag_and_value.split('=')
parts = tag_and_value.split("=")
length = len(parts)
if length == 1:
self.tags[parts[0]] = None
elif length == 2:
self.tags[parts[0]] = parts[1]
except Exception:
logger.debug("Error parsing INSTANA_TAGS env var: %s", tag_list)
logger.debug(f"Error parsing INSTANA_TAGS env var: {tag_list}")

self.zone = os.environ.get("INSTANA_ZONE", None)


class EKSFargateOptions(AWSFargateOptions):
""" Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate. """
def __init__(self, **kwds):
"""Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate."""

def __init__(self, **kwds: Dict[str, Any]) -> None:
super(EKSFargateOptions, self).__init__()


class GCROptions(ServerlessOptions):
""" Options class for Google Cloud Run. Holds settings specific to Google Cloud Run. """
"""Options class for Google Cloud Run. Holds settings specific to Google Cloud Run."""

def __init__(self, **kwds):
def __init__(self, **kwds: Dict[str, Any]) -> None:
super(GCROptions, self).__init__()
Loading

0 comments on commit 272639c

Please sign in to comment.