Skip to content

feat: Kafka FUP Phase I #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies = [
"opentelemetry-api>=1.27.0",
"opentelemetry-semantic-conventions>=0.48b0",
"typing_extensions>=4.12.2",
"pyyaml>=6.0.2",
Copy link
Contributor

@GSVarsha GSVarsha Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is pyyaml a dependency for the tracer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GSVarsha, that's because we need to read YAML configuration files now. This is part of the design.

I don't like the idea of having it as one of the main dependencies of our package. Perhaps we can add it as an extra installation option. The users would then execute something like the following command if they want to use the Kafka filtering:

pip install instana[yaml]

]

[project.entry-points."instana"]
Expand Down
40 changes: 29 additions & 11 deletions src/instana/agent/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
Filters given span list using ignore-endpoint variable and returns the list of filtered spans.
"""
filtered_spans = []
endpoint = ""
for span in spans:
if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"):
service = span.n
operation_specifier = get_operation_specifier(service)
endpoint = span.data[service][operation_specifier]
if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored(
service, endpoint
operation_specifier_key, service_specifier_key = (
get_operation_specifier(service)
)
if service == "kafka":
endpoint = span.data[service][service_specifier_key]
method = span.data[service][operation_specifier_key]
if isinstance(method, str) and self.__is_endpoint_ignored(
service, method, endpoint
):
continue
else:
Expand All @@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
filtered_spans.append(span)
return filtered_spans

def __is_service_or_endpoint_ignored(
self, service: str, endpoint: str = ""
def __is_endpoint_ignored(
self,
service: str,
method: str = "",
endpoint: str = "",
) -> bool:
"""Check if the given service and endpoint combination should be ignored."""

return (
service.lower() in self.options.ignore_endpoints
or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints
)
service = service.lower()
method = method.lower()
endpoint = endpoint.lower()
filter_rules = [
f"{service}.{method}", # service.method
f"{service}.*", # service.*
]

if service == "kafka" and endpoint:
filter_rules += [
f"{service}.{method}.{endpoint}", # service.method.endpoint
f"{service}.*.{endpoint}", # service.*.endpoint
f"{service}.{method}.*", # service.method.*
]
return any(rule in self.options.ignore_endpoints for rule in filter_rules)

def handle_agent_tasks(self, task: Dict[str, Any]) -> None:
"""
Expand Down
26 changes: 17 additions & 9 deletions src/instana/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from typing import Any, Dict

from instana.log import logger
from instana.util.config import parse_ignored_endpoints
from instana.util.config import (
parse_ignored_endpoints,
parse_ignored_endpoints_from_yaml,
)
from instana.util.runtime import determine_service_name
from instana.configurator import config

Expand All @@ -44,18 +47,23 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
)

if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints(
os.environ["INSTANA_IGNORE_ENDPOINTS"]
if "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints_from_yaml(
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"]
)
else:
if (
isinstance(config.get("tracing"), dict)
and "ignore_endpoints" in config["tracing"]
):
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints(
config["tracing"]["ignore_endpoints"],
os.environ["INSTANA_IGNORE_ENDPOINTS"]
)
else:
if (
isinstance(config.get("tracing"), dict)
and "ignore_endpoints" in config["tracing"]
):
self.ignore_endpoints = parse_ignored_endpoints(
config["tracing"]["ignore_endpoints"],
)

if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
self.allow_exit_as_root = True
Expand Down
95 changes: 76 additions & 19 deletions src/instana/util/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import itertools
from typing import Any, Dict, List, Union

import yaml

from instana.log import logger


Expand All @@ -7,19 +11,19 @@ def parse_service_pair(pair: str) -> List[str]:
Parses a pair string to prepare a list of ignored endpoints.

@param pair: String format:
- "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1"
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
- "service1:method1,method2" or "service1:method1" or "service1"
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
"""
pair_list = []
if ":" in pair:
service, endpoints = pair.split(":", 1)
service, methods = pair.split(":", 1)
service = service.strip()
endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()]
method_list = [ep.strip() for ep in methods.split(",") if ep.strip()]

for endpoint in endpoint_list:
pair_list.append(f"{service}.{endpoint}")
for method in method_list:
pair_list.append(f"{service}.{method}")
else:
pair_list.append(pair)
pair_list.append(f"{pair}.*")
return pair_list


Expand All @@ -28,8 +32,8 @@ def parse_ignored_endpoints_string(params: str) -> List[str]:
Parses a string to prepare a list of ignored endpoints.

@param params: String format:
- "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
- "service1:method1,method2;service2:method3" or "service1;service2"
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
"""
ignore_endpoints = []
if params:
Expand All @@ -46,18 +50,45 @@ def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]:
Parses a dictionary to prepare a list of ignored endpoints.

@param params: Dict format:
- {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
- {"service1": ["method1", "method2"], "service2": ["method3"]}
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
"""
ignore_endpoints = []

for service, endpoints in params.items():
if not endpoints: # filtering all service
ignore_endpoints.append(service.lower())
for service, methods in params.items():
if not methods: # filtering all service
ignore_endpoints.append(f"{service.lower()}.*")
else: # filtering specific endpoints
for endpoint in endpoints:
ignore_endpoints.append(f"{service.lower()}.{endpoint.lower()}")
ignore_endpoints = parse_endpoints_of_service(
ignore_endpoints, service, methods
)

return ignore_endpoints


def parse_endpoints_of_service(
ignore_endpoints: List[str],
service: str,
methods: Union[str, List[str]],
) -> List[str]:
"""
Parses endpoints of each service.

@param ignore_endpoints: A list of rules for endpoints to be filtered.
@param service: The name of the service to be filtered.
@param methods: A list of specific endpoints of the service to be filtered.
"""
if service == "kafka" and isinstance(methods, list):
for rule in methods:
for method, endpoint in itertools.product(
rule["methods"], rule["endpoints"]
):
ignore_endpoints.append(
f"{service.lower()}.{method.lower()}.{endpoint.lower()}"
)
else:
for method in methods:
ignore_endpoints.append(f"{service.lower()}.{method.lower()}")
return ignore_endpoints


Expand All @@ -66,9 +97,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
Parses input to prepare a list for ignored endpoints.

@param params: Can be either:
- String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
- Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
- String: "service1:method1,method2;service2:method3" or "service1;service2"
- Dict: {"service1": ["method1", "method2"], "service2": ["method3"]}
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
"""
try:
if isinstance(params, str):
Expand All @@ -80,3 +111,29 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
except Exception as e:
logger.debug("Error parsing ignored endpoints: %s", str(e))
return []


def parse_ignored_endpoints_from_yaml(configuration: str) -> List[str]:
"""
Parses configuration yaml file and prepares a list of ignored endpoints.

@param configuration: Path of the file as a string
@param is_yaml: True if the given configuration is yaml string. False if it's the path of the file.
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"]
"""
ignored_endpoints = []
with open(configuration, "r") as configuration_file:
yaml_configuration = yaml.safe_load(configuration_file)
configuration_key = (
"tracing" if "tracing" in yaml_configuration else "com.instana.tracing"
)
if (
configuration_key in yaml_configuration
and "ignore-endpoints" in yaml_configuration[configuration_key]
):
ignored_endpoints = parse_ignored_endpoints(
yaml_configuration[configuration_key]["ignore-endpoints"]
)
if configuration_key == "com.instana.tracing":
logger.debug('Please use "tracing" instead of "com.instana.tracing"')
return ignored_endpoints
16 changes: 10 additions & 6 deletions src/instana/util/span_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# (c) Copyright IBM Corp. 2025

from typing import Optional
from typing import Tuple


def get_operation_specifier(span_name: str) -> Optional[str]:
def get_operation_specifier(span_name: str) -> Tuple[str, str]:
Copy link
Contributor

@GSVarsha GSVarsha Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this method's name since now it returns both operation_specifier_key and service_specifier_key?

"""Get the specific operation specifier for the given span."""
operation_specifier = ""
operation_specifier_key = ""
service_specifier_key = ""
if span_name == "redis":
operation_specifier = "command"
operation_specifier_key = "command"
elif span_name == "dynamodb":
operation_specifier = "op"
return operation_specifier
operation_specifier_key = "op"
elif span_name == "kafka":
operation_specifier_key = "access"
service_specifier_key = "service"
return operation_specifier_key, service_specifier_key
30 changes: 10 additions & 20 deletions tests/agent/test_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,31 +692,21 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None:
assert "should_send_snapshot_data: True" in caplog.messages

def test_is_service_or_endpoint_ignored(self) -> None:
self.agent.options.ignore_endpoints.append("service1")
self.agent.options.ignore_endpoints.append("service2.endpoint1")
self.agent.options.ignore_endpoints.append("service1.*")
self.agent.options.ignore_endpoints.append("service2.method1")

# ignore all endpoints of service1
assert self.agent._HostAgent__is_service_or_endpoint_ignored("service1")
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
"service1", "endpoint1"
)
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
"service1", "endpoint2"
)
assert self.agent._HostAgent__is_endpoint_ignored("service1")
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1")
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2")

# case-insensitive
assert self.agent._HostAgent__is_service_or_endpoint_ignored("SERVICE1")
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
"service1", "ENDPOINT1"
)
assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1")
assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1")

# ignore only endpoint1 of service2
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
"service2", "endpoint1"
)
assert not self.agent._HostAgent__is_service_or_endpoint_ignored(
"service2", "endpoint2"
)
assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1")
assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2")

# don't ignore other services
assert not self.agent._HostAgent__is_service_or_endpoint_ignored("service3")
assert not self.agent._HostAgent__is_endpoint_ignored("service3")
2 changes: 1 addition & 1 deletion tests/clients/boto3/test_boto3_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_ignore_dynamodb(self) -> None:
assert dynamodb_span not in filtered_spans

def test_ignore_create_table(self) -> None:
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb.createtable"
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable"
agent.options = StandardOptions()

with tracer.start_as_current_span("test"):
Expand Down
Loading
Loading