From b737e1977baf4ad331119c716019c8f5a1909a3f Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Thu, 10 Apr 2025 05:53:06 -0700 Subject: [PATCH 1/2] feat: added span filtering for kafka-python module Signed-off-by: Cagri Yonca --- pyproject.toml | 1 + src/instana/agent/host.py | 40 ++++-- src/instana/options.py | 26 ++-- src/instana/util/config.py | 95 +++++++++++--- src/instana/util/span_utils.py | 16 ++- tests/agent/test_host.py | 30 ++--- tests/clients/boto3/test_boto3_dynamodb.py | 2 +- tests/clients/kafka/test_kafka_python.py | 124 ++++++++++++++++++ tests/test_options.py | 50 +++---- tests/util/test_config.py | 143 ++++++++++++++++----- tests/util/test_configuration-1.yaml | 17 +++ tests/util/test_configuration-2.yaml | 17 +++ tests/util/test_span_utils.py | 17 ++- 13 files changed, 448 insertions(+), 130 deletions(-) create mode 100644 tests/util/test_configuration-1.yaml create mode 100644 tests/util/test_configuration-2.yaml diff --git a/pyproject.toml b/pyproject.toml index 1f12cb6e..87abacfb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ dependencies = [ "opentelemetry-api>=1.27.0", "opentelemetry-semantic-conventions>=0.48b0", "typing_extensions>=4.12.2", + "pyyaml>=6.0.2", ] [project.entry-points."instana"] diff --git a/src/instana/agent/host.py b/src/instana/agent/host.py index 8e03833b..fb70a2f2 100644 --- a/src/instana/agent/host.py +++ b/src/instana/agent/host.py @@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]: Filters given span list using ignore-endpoint variable and returns the list of filtered spans. """ filtered_spans = [] + endpoint = "" for span in spans: if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"): service = span.n - operation_specifier = get_operation_specifier(service) - endpoint = span.data[service][operation_specifier] - if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored( - service, endpoint + operation_specifier_key, service_specifier_key = ( + get_operation_specifier(service) + ) + if service == "kafka": + endpoint = span.data[service][service_specifier_key] + method = span.data[service][operation_specifier_key] + if isinstance(method, str) and self.__is_endpoint_ignored( + service, method, endpoint ): continue else: @@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]: filtered_spans.append(span) return filtered_spans - def __is_service_or_endpoint_ignored( - self, service: str, endpoint: str = "" + def __is_endpoint_ignored( + self, + service: str, + method: str = "", + endpoint: str = "", ) -> bool: """Check if the given service and endpoint combination should be ignored.""" - - return ( - service.lower() in self.options.ignore_endpoints - or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints - ) + service = service.lower() + method = method.lower() + endpoint = endpoint.lower() + filter_rules = [ + f"{service}.{method}", # service.method + f"{service}.*", # service.* + ] + + if service == "kafka" and endpoint: + filter_rules += [ + f"{service}.{method}.{endpoint}", # service.method.endpoint + f"{service}.*.{endpoint}", # service.*.endpoint + f"{service}.{method}.*", # service.method.* + ] + return any(rule in self.options.ignore_endpoints for rule in filter_rules) def handle_agent_tasks(self, task: Dict[str, Any]) -> None: """ diff --git a/src/instana/options.py b/src/instana/options.py index 2bd2f4e0..20fee29b 100644 --- a/src/instana/options.py +++ b/src/instana/options.py @@ -19,7 +19,10 @@ from typing import Any, Dict from instana.log import logger -from instana.util.config import parse_ignored_endpoints +from instana.util.config import ( + parse_ignored_endpoints, + parse_ignored_endpoints_from_yaml, +) from instana.util.runtime import determine_service_name from instana.configurator import config @@ -44,18 +47,23 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";") ) - if "INSTANA_IGNORE_ENDPOINTS" in os.environ: - self.ignore_endpoints = parse_ignored_endpoints( - os.environ["INSTANA_IGNORE_ENDPOINTS"] + if "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ: + self.ignore_endpoints = parse_ignored_endpoints_from_yaml( + os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] ) else: - if ( - isinstance(config.get("tracing"), dict) - and "ignore_endpoints" in config["tracing"] - ): + if "INSTANA_IGNORE_ENDPOINTS" in os.environ: self.ignore_endpoints = parse_ignored_endpoints( - config["tracing"]["ignore_endpoints"], + os.environ["INSTANA_IGNORE_ENDPOINTS"] ) + else: + if ( + isinstance(config.get("tracing"), dict) + and "ignore_endpoints" in config["tracing"] + ): + self.ignore_endpoints = parse_ignored_endpoints( + config["tracing"]["ignore_endpoints"], + ) if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1": self.allow_exit_as_root = True diff --git a/src/instana/util/config.py b/src/instana/util/config.py index c8f6d1f9..ed91e988 100644 --- a/src/instana/util/config.py +++ b/src/instana/util/config.py @@ -1,4 +1,8 @@ +import itertools from typing import Any, Dict, List, Union + +import yaml + from instana.log import logger @@ -7,19 +11,19 @@ def parse_service_pair(pair: str) -> List[str]: Parses a pair string to prepare a list of ignored endpoints. @param pair: String format: - - "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1" - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - "service1:method1,method2" or "service1:method1" or "service1" + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ pair_list = [] if ":" in pair: - service, endpoints = pair.split(":", 1) + service, methods = pair.split(":", 1) service = service.strip() - endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()] + method_list = [ep.strip() for ep in methods.split(",") if ep.strip()] - for endpoint in endpoint_list: - pair_list.append(f"{service}.{endpoint}") + for method in method_list: + pair_list.append(f"{service}.{method}") else: - pair_list.append(pair) + pair_list.append(f"{pair}.*") return pair_list @@ -28,8 +32,8 @@ def parse_ignored_endpoints_string(params: str) -> List[str]: Parses a string to prepare a list of ignored endpoints. @param params: String format: - - "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2" - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - "service1:method1,method2;service2:method3" or "service1;service2" + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ ignore_endpoints = [] if params: @@ -46,18 +50,45 @@ def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]: Parses a dictionary to prepare a list of ignored endpoints. @param params: Dict format: - - {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]} - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - {"service1": ["method1", "method2"], "service2": ["method3"]} + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ ignore_endpoints = [] - for service, endpoints in params.items(): - if not endpoints: # filtering all service - ignore_endpoints.append(service.lower()) + for service, methods in params.items(): + if not methods: # filtering all service + ignore_endpoints.append(f"{service.lower()}.*") else: # filtering specific endpoints - for endpoint in endpoints: - ignore_endpoints.append(f"{service.lower()}.{endpoint.lower()}") + ignore_endpoints = parse_endpoints_of_service( + ignore_endpoints, service, methods + ) + + return ignore_endpoints + + +def parse_endpoints_of_service( + ignore_endpoints: List[str], + service: str, + methods: Union[str, List[str]], +) -> List[str]: + """ + Parses endpoints of each service. + @param ignore_endpoints: A list of rules for endpoints to be filtered. + @param service: The name of the service to be filtered. + @param methods: A list of specific endpoints of the service to be filtered. + """ + if service == "kafka" and isinstance(methods, list): + for rule in methods: + for method, endpoint in itertools.product( + rule["methods"], rule["endpoints"] + ): + ignore_endpoints.append( + f"{service.lower()}.{method.lower()}.{endpoint.lower()}" + ) + else: + for method in methods: + ignore_endpoints.append(f"{service.lower()}.{method.lower()}") return ignore_endpoints @@ -66,9 +97,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: Parses input to prepare a list for ignored endpoints. @param params: Can be either: - - String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2" - - Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]} - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - String: "service1:method1,method2;service2:method3" or "service1;service2" + - Dict: {"service1": ["method1", "method2"], "service2": ["method3"]} + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ try: if isinstance(params, str): @@ -80,3 +111,29 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: except Exception as e: logger.debug("Error parsing ignored endpoints: %s", str(e)) return [] + + +def parse_ignored_endpoints_from_yaml(configuration: str) -> List[str]: + """ + Parses configuration yaml file and prepares a list of ignored endpoints. + + @param configuration: Path of the file as a string + @param is_yaml: True if the given configuration is yaml string. False if it's the path of the file. + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"] + """ + ignored_endpoints = [] + with open(configuration, "r") as configuration_file: + yaml_configuration = yaml.safe_load(configuration_file) + configuration_key = ( + "tracing" if "tracing" in yaml_configuration else "com.instana.tracing" + ) + if ( + configuration_key in yaml_configuration + and "ignore-endpoints" in yaml_configuration[configuration_key] + ): + ignored_endpoints = parse_ignored_endpoints( + yaml_configuration[configuration_key]["ignore-endpoints"] + ) + if configuration_key == "com.instana.tracing": + logger.debug('Please use "tracing" instead of "com.instana.tracing"') + return ignored_endpoints diff --git a/src/instana/util/span_utils.py b/src/instana/util/span_utils.py index 34049759..45dbc0d1 100644 --- a/src/instana/util/span_utils.py +++ b/src/instana/util/span_utils.py @@ -1,13 +1,17 @@ # (c) Copyright IBM Corp. 2025 -from typing import Optional +from typing import Tuple -def get_operation_specifier(span_name: str) -> Optional[str]: +def get_operation_specifier(span_name: str) -> Tuple[str, str]: """Get the specific operation specifier for the given span.""" - operation_specifier = "" + operation_specifier_key = "" + service_specifier_key = "" if span_name == "redis": - operation_specifier = "command" + operation_specifier_key = "command" elif span_name == "dynamodb": - operation_specifier = "op" - return operation_specifier + operation_specifier_key = "op" + elif span_name == "kafka": + operation_specifier_key = "access" + service_specifier_key = "service" + return operation_specifier_key, service_specifier_key diff --git a/tests/agent/test_host.py b/tests/agent/test_host.py index 4ec2647d..93b89c0a 100644 --- a/tests/agent/test_host.py +++ b/tests/agent/test_host.py @@ -692,31 +692,21 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None: assert "should_send_snapshot_data: True" in caplog.messages def test_is_service_or_endpoint_ignored(self) -> None: - self.agent.options.ignore_endpoints.append("service1") - self.agent.options.ignore_endpoints.append("service2.endpoint1") + self.agent.options.ignore_endpoints.append("service1.*") + self.agent.options.ignore_endpoints.append("service2.method1") # ignore all endpoints of service1 - assert self.agent._HostAgent__is_service_or_endpoint_ignored("service1") - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "endpoint1" - ) - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "endpoint2" - ) + assert self.agent._HostAgent__is_endpoint_ignored("service1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2") # case-insensitive - assert self.agent._HostAgent__is_service_or_endpoint_ignored("SERVICE1") - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "ENDPOINT1" - ) + assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1") # ignore only endpoint1 of service2 - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service2", "endpoint1" - ) - assert not self.agent._HostAgent__is_service_or_endpoint_ignored( - "service2", "endpoint2" - ) + assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1") + assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2") # don't ignore other services - assert not self.agent._HostAgent__is_service_or_endpoint_ignored("service3") + assert not self.agent._HostAgent__is_endpoint_ignored("service3") diff --git a/tests/clients/boto3/test_boto3_dynamodb.py b/tests/clients/boto3/test_boto3_dynamodb.py index bb427e64..55f09df6 100644 --- a/tests/clients/boto3/test_boto3_dynamodb.py +++ b/tests/clients/boto3/test_boto3_dynamodb.py @@ -94,7 +94,7 @@ def test_ignore_dynamodb(self) -> None: assert dynamodb_span not in filtered_spans def test_ignore_create_table(self) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb.createtable" + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable" agent.options = StandardOptions() with tracer.start_as_current_span("test"): diff --git a/tests/clients/kafka/test_kafka_python.py b/tests/clients/kafka/test_kafka_python.py index 3a0ecfde..e2e02e03 100644 --- a/tests/clients/kafka/test_kafka_python.py +++ b/tests/clients/kafka/test_kafka_python.py @@ -1,5 +1,6 @@ # (c) Copyright IBM Corp. 2025 +import os from typing import Generator import pytest @@ -8,7 +9,9 @@ from kafka.errors import TopicAlreadyExistsError from opentelemetry.trace import SpanKind +from instana.options import StandardOptions from instana.singletons import agent, tracer +from instana.util.config import parse_ignored_endpoints_from_yaml from tests.helpers import testenv @@ -202,3 +205,124 @@ def test_trace_kafka_python_error(self) -> None: assert kafka_span.data["kafka"]["service"] == "inexistent_kafka_topic" assert kafka_span.data["kafka"]["access"] == "consume" assert kafka_span.data["kafka"]["error"] == "StopIteration()" + + def consume_from_topic(self, topic_name: str) -> None: + consumer = KafkaConsumer( + topic_name, + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", + enable_auto_commit=False, + consumer_timeout_ms=1000, + ) + with tracer.start_as_current_span("test"): + for msg in consumer: + if msg is None: + break + + consumer.close() + + def test_ignore_kafka(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka" + + agent.options = StandardOptions() + + with tracer.start_as_current_span("test"): + self.producer.send(testenv["kafka_topic"], b"raw_bytes") + self.producer.flush() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + def test_ignore_kafka_producer(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:send" + + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer = KafkaConsumer( + testenv["kafka_topic"], + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", + enable_auto_commit=False, + consumer_timeout_ms=1000, + ) + for msg in consumer: + if msg is None: + break + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @pytest.mark.flaky(reruns=3) + def test_ignore_kafka_consumer(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume" + agent.options = StandardOptions() + + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + + spans = self.recorder.queued_spans() + assert len(spans) == 4 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @pytest.mark.flaky(reruns=5) + def test_ignore_specific_topic(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume" + os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] = ( + "tests/util/test_configuration-1.yaml" + ) + + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + + spans = self.recorder.queued_spans() + assert len(spans) == 6 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 3 + + def test_ignore_specific_topic_with_config_file(self) -> None: + agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 diff --git a/tests/test_options.py b/tests/test_options.py index 747f348f..c126b5c5 100644 --- a/tests/test_options.py +++ b/tests/test_options.py @@ -58,18 +58,18 @@ def test_base_options(self) -> None: assert not test_base_options.secrets def test_base_options_with_config(self) -> None: - config["tracing"]["ignore_endpoints"] = "service1;service3:endpoint1,endpoint2" + config["tracing"]["ignore_endpoints"] = "service1;service3:method1,method2" test_base_options = BaseOptions() assert test_base_options.ignore_endpoints == [ - "service1", - "service3.endpoint1", - "service3.endpoint2", + "service1.*", + "service3.method1", + "service3.method2", ] def test_base_options_with_env_vars(self) -> None: os.environ["INSTANA_DEBUG"] = "true" os.environ["INSTANA_EXTRA_HTTP_HEADERS"] = "SOMETHING;HERE" - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "service1;service2:endpoint1,endpoint2" + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "service1;service2:method1,method2" os.environ["INSTANA_SECRETS"] = "secret1:username,password" test_base_options = BaseOptions() @@ -79,9 +79,9 @@ def test_base_options_with_env_vars(self) -> None: assert test_base_options.extra_http_headers == ["something", "here"] assert test_base_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + "service1.*", + "service2.method1", + "service2.method2", ] assert test_base_options.secrets_matcher == "secret1" @@ -120,13 +120,13 @@ def test_set_extra_headers(self) -> None: def test_set_tracing(self) -> None: test_standard_options = StandardOptions() - test_tracing = {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"} + test_tracing = {"ignore-endpoints": "service1;service2:method1,method2"} test_standard_options.set_tracing(test_tracing) assert test_standard_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + "service1.*", + "service2.method1", + "service2.method2", ] assert not test_standard_options.extra_http_headers @@ -134,20 +134,20 @@ def test_set_tracing_priority(self) -> None: # Environment variables > In-code Configuration > Agent Configuration # First test when all attributes given os.environ["INSTANA_IGNORE_ENDPOINTS"] = ( - "env_service1;env_service2:endpoint1,endpoint2" + "env_service1;env_service2:method1,method2" ) config["tracing"]["ignore_endpoints"] = ( - "config_service1;config_service2:endpoint1,endpoint2" + "config_service1;config_service2:method1,method2" ) - test_tracing = {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"} + test_tracing = {"ignore-endpoints": "service1;service2:method1,method2"} test_standard_options = StandardOptions() test_standard_options.set_tracing(test_tracing) assert test_standard_options.ignore_endpoints == [ - "env_service1", - "env_service2.endpoint1", - "env_service2.endpoint2", + "env_service1.*", + "env_service2.method1", + "env_service2.method2", ] # Second test when In-code configuration and Agent configuration given @@ -158,16 +158,16 @@ def test_set_tracing_priority(self) -> None: test_standard_options.set_tracing(test_tracing) assert test_standard_options.ignore_endpoints == [ - "config_service1", - "config_service2.endpoint1", - "config_service2.endpoint2", + "config_service1.*", + "config_service2.method1", + "config_service2.method2", ] def test_set_from(self) -> None: test_standard_options = StandardOptions() test_res_data = { "secrets": {"matcher": "sample-match", "list": ["sample", "list"]}, - "tracing": {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"}, + "tracing": {"ignore-endpoints": "service1;service2:method1,method2"}, } test_standard_options.set_from(test_res_data) @@ -176,9 +176,9 @@ def test_set_from(self) -> None: ) assert test_standard_options.secrets_list == test_res_data["secrets"]["list"] assert test_standard_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + "service1.*", + "service2.method1", + "service2.method2", ] test_res_data = { diff --git a/tests/util/test_config.py b/tests/util/test_config.py index 891007e8..90ebe410 100644 --- a/tests/util/test_config.py +++ b/tests/util/test_config.py @@ -1,10 +1,13 @@ +import logging from typing import Generator import pytest from instana.util.config import ( + parse_endpoints_of_service, parse_ignored_endpoints, parse_ignored_endpoints_dict, + parse_ignored_endpoints_from_yaml, parse_service_pair, ) @@ -15,28 +18,28 @@ def _resource(self) -> Generator[None, None, None]: yield def test_parse_service_pair(self) -> None: - test_string = "service1:endpoint1,endpoint2" + test_string = "service1:method1,method2" response = parse_service_pair(test_string) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_string = "service1;service2" response = parse_ignored_endpoints(test_string) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_string = "service1" response = parse_ignored_endpoints(test_string) - assert response == ["service1"] + assert response == ["service1.*"] test_string = ";" response = parse_ignored_endpoints(test_string) assert response == [] - test_string = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_string = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_string) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_string = "" @@ -44,28 +47,28 @@ def test_parse_service_pair(self) -> None: assert response == [] def test_parse_ignored_endpoints_string(self) -> None: - test_string = "service1:endpoint1,endpoint2" + test_string = "service1:method1,method2" response = parse_service_pair(test_string) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_string = "service1;service2" response = parse_ignored_endpoints(test_string) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_string = "service1" response = parse_ignored_endpoints(test_string) - assert response == ["service1"] + assert response == ["service1.*"] test_string = ";" response = parse_ignored_endpoints(test_string) assert response == [] - test_string = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_string = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_string) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_string = "" @@ -73,67 +76,139 @@ def test_parse_ignored_endpoints_string(self) -> None: assert response == [] def test_parse_ignored_endpoints_dict(self) -> None: - test_dict = {"service1": ["endpoint1", "endpoint2"]} + test_dict = {"service1": ["method1", "method2"]} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] - test_dict = {"SERVICE1": ["ENDPOINT1", "ENDPOINT2"]} + test_dict = {"SERVICE1": ["method1", "method2"]} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_dict = {"service1": [], "service2": []} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_dict = {"service1": []} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1"] + assert response == ["service1.*"] test_dict = {} response = parse_ignored_endpoints_dict(test_dict) assert response == [] def test_parse_ignored_endpoints(self) -> None: - test_pair = "service1:endpoint1,endpoint2" + test_pair = "service1:method1,method2" response = parse_ignored_endpoints(test_pair) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_pair = "service1;service2" response = parse_ignored_endpoints(test_pair) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_pair = "service1" response = parse_ignored_endpoints(test_pair) - assert response == ["service1"] + assert response == ["service1.*"] test_pair = ";" response = parse_ignored_endpoints(test_pair) assert response == [] - test_pair = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_pair = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_pair) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_pair = "" response = parse_ignored_endpoints(test_pair) assert response == [] - test_dict = {"service1": ["endpoint1", "endpoint2"]} + test_dict = {"service1": ["method1", "method2"]} response = parse_ignored_endpoints(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_dict = {"service1": [], "service2": []} response = parse_ignored_endpoints(test_dict) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_dict = {"service1": []} response = parse_ignored_endpoints(test_dict) - assert response == ["service1"] + assert response == ["service1.*"] test_dict = {} response = parse_ignored_endpoints(test_dict) assert response == [] + + def test_parse_endpoints_of_service(self) -> None: + test_ignore_endpoints = { + "service1": ["method1", "method2"], + "service2": ["method3", "method4"], + "kafka": [ + { + "methods": ["method5", "method6"], + "endpoints": ["endpoint1", "endpoint2"], + } + ], + } + ignore_endpoints = [] + for service, methods in test_ignore_endpoints.items(): + ignore_endpoints.extend(parse_endpoints_of_service([], service, methods)) + assert ignore_endpoints == [ + "service1.method1", + "service1.method2", + "service2.method3", + "service2.method4", + "kafka.method5.endpoint1", + "kafka.method5.endpoint2", + "kafka.method6.endpoint1", + "kafka.method6.endpoint2", + ] + + def test_parse_ignored_endpoints_from_yaml( + self, + caplog: pytest.LogCaptureFixture, + ) -> None: + caplog.set_level(logging.DEBUG, logger="instana") + + # test with tracing + ignored_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + assert ignored_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + + # test with com.instana.tracing + ignored_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-2.yaml" + ) + assert ignored_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + assert ( + 'Please use "tracing" instead of "com.instana.tracing"' in caplog.messages + ) diff --git a/tests/util/test_configuration-1.yaml b/tests/util/test_configuration-1.yaml new file mode 100644 index 00000000..2c1157ba --- /dev/null +++ b/tests/util/test_configuration-1.yaml @@ -0,0 +1,17 @@ +# service-level configuration, aligning with in-code settings +tracing: + ignore-endpoints: + redis: + - get + - type + dynamodb: + - query + kafka: + - methods: ["consume", "send"] + endpoints: ["span-topic", "topic1", "topic2"] + - methods: ["consume"] + endpoints: ["topic3"] + - methods: ["*"] # Applied to all methods + endpoints: ["span-topic", "topic4"] + # - methods: ["consume", "send"] + # endpoints: ["*"] # Applied to all topics \ No newline at end of file diff --git a/tests/util/test_configuration-2.yaml b/tests/util/test_configuration-2.yaml new file mode 100644 index 00000000..7a319fab --- /dev/null +++ b/tests/util/test_configuration-2.yaml @@ -0,0 +1,17 @@ +# service-level configuration, aligning with in-code settings +com.instana.tracing: + ignore-endpoints: + redis: + - get + - type + dynamodb: + - query + kafka: + - methods: ["consume", "send"] + endpoints: ["span-topic", "topic1", "topic2"] + - methods: ["consume"] + endpoints: ["topic3"] + - methods: ["*"] # Applied to all methods + endpoints: ["span-topic", "topic4"] + # - methods: ["consume", "send"] + # endpoints: ["*"] # Applied to all topics \ No newline at end of file diff --git a/tests/util/test_span_utils.py b/tests/util/test_span_utils.py index 32f623c6..dca536e4 100644 --- a/tests/util/test_span_utils.py +++ b/tests/util/test_span_utils.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import List, Optional import pytest from instana.util.span_utils import get_operation_specifier @@ -6,10 +6,17 @@ @pytest.mark.parametrize( "span_name, expected_result", - [("something", ""), ("redis", "command"), ("dynamodb", "op")], + [ + ("something", ["", ""]), + ("redis", ["command", ""]), + ("dynamodb", ["op", ""]), + ("kafka", ["access", "service"]), + ], ) def test_get_operation_specifier( - span_name: str, expected_result: Optional[str] + span_name: str, + expected_result: Optional[List[str]], ) -> None: - response_redis = get_operation_specifier(span_name) - assert response_redis == expected_result + operation_specifier, service_specifier = get_operation_specifier(span_name) + assert operation_specifier == expected_result[0] + assert service_specifier == expected_result[1] From 7dd42d8c4676011a19179b4eb994605d3d32c4fa Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Thu, 10 Apr 2025 05:53:13 -0700 Subject: [PATCH 2/2] feat: added span filtering for confluent-kafka module Signed-off-by: Cagri Yonca --- tests/clients/kafka/test_confluent_kafka.py | 127 ++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py index 827a8c95..c983775d 100644 --- a/tests/clients/kafka/test_confluent_kafka.py +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -1,5 +1,6 @@ # (c) Copyright IBM Corp. 2025 +import os from typing import Generator import pytest @@ -11,7 +12,9 @@ from confluent_kafka.admin import AdminClient, NewTopic from opentelemetry.trace import SpanKind +from instana.options import StandardOptions from instana.singletons import agent, tracer +from instana.util.config import parse_ignored_endpoints_from_yaml from tests.helpers import testenv @@ -186,3 +189,127 @@ def test_trace_confluent_kafka_error(self) -> None: kafka_span.data["kafka"]["error"] == "num_messages must be between 0 and 1000000 (1M)" ) + + def test_ignore_confluent_kafka(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka" + agent.options = StandardOptions() + + with tracer.start_as_current_span("test"): + self.producer.produce(testenv["kafka_topic"], b"raw_bytes") + self.producer.flush(timeout=10) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + def test_ignore_confluent_kafka_producer(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:produce" + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=2, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 4 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 2 + + def test_ignore_confluent_kafka_consumer(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume" + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=2, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 3 + + def test_ignore_confluent_specific_topic(self) -> None: + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume" + os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] = ( + "tests/util/test_configuration-1.yaml" + ) + + agent.options = StandardOptions() + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=1, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + def test_ignore_confluent_specific_topic_with_config_file(self) -> None: + agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=1, timeout=60) + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1