From 6d73154768882d47c95a12eb919879813453e4d4 Mon Sep 17 00:00:00 2001 From: zzhlogin Date: Fri, 12 Jan 2024 09:55:03 -0800 Subject: [PATCH] Add AwsAttributePropagatingSpanProcessor. (#14) In this commit, we are implementing AwsAttributePropagatingSpanProcessor and AwsAttributePropagatingSpanProcessorBuilder. As much as possible, we are attempting to mirror the implementation of these class found in https://github.com/aws-observability/aws-otel-java-instrumentation By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- .../distro/_aws_attribute_keys.py | 2 +- .../distro/_aws_span_processing_util.py | 6 +- .../attribute_propagating_span_processor.py | 97 +++++++++++++++++++ ...bute_propagating_span_processor_builder.py | 50 ++++++++++ ...st_attribute_propagating_span_processor.py | 11 +++ ...bute_propagating_span_processor_builder.py | 13 +++ 6 files changed, 175 insertions(+), 4 deletions(-) create mode 100644 aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor.py create mode 100644 aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor_builder.py create mode 100644 aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_attribute_propagating_span_processor.py create mode 100644 aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_attribute_propagating_span_processor_builder.py diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py index b0ada931d..0d075f6d1 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 -class AwsAttributeKeys: +class _AwsAttributeKeys: """Utility class holding attribute keys with special meaning to AWS components""" AWS_SPAN_KIND: str = "aws.span.kind" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_span_processing_util.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_span_processing_util.py index 00be704c5..139890b96 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_span_processing_util.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_span_processing_util.py @@ -1,7 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 """Utility module designed to support shared logic across AWS Span Processors.""" -from amazon.opentelemetry.distro._aws_attribute_keys import AwsAttributeKeys +from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys from opentelemetry.sdk.trace import InstrumentationScope, ReadableSpan from opentelemetry.semconv.trace import MessagingOperationValues, SpanAttributes from opentelemetry.trace import SpanKind @@ -35,7 +35,7 @@ def get_ingress_operation(__, span: ReadableSpan) -> str: def get_egress_operation(span: ReadableSpan) -> str: if should_use_internal_operation(span): return INTERNAL_OPERATION - return span.attributes.get(AwsAttributeKeys.AWS_LOCAL_OPERATION) + return span.attributes.get(_AwsAttributeKeys.AWS_LOCAL_OPERATION) def extract_api_path_value(http_target: str) -> str: @@ -117,7 +117,7 @@ def _is_dependency_consumer_span(span: ReadableSpan) -> bool: if is_consumer_process_span(span): if is_local_root(span): return True - parent_span_kind: str = span.attributes.get(AwsAttributeKeys.AWS_CONSUMER_PARENT_SPAN_KIND) + parent_span_kind: str = span.attributes.get(_AwsAttributeKeys.AWS_CONSUMER_PARENT_SPAN_KIND) return SpanKind.CONSUMER != parent_span_kind return True diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor.py new file mode 100644 index 000000000..21b5d43cf --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor.py @@ -0,0 +1,97 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Callable, Optional, Tuple + +from typing_extensions import override + +from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys +from amazon.opentelemetry.distro._aws_span_processing_util import is_aws_sdk_span, is_local_root +from opentelemetry.context import Context +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor +from opentelemetry.trace import SpanKind +from opentelemetry.trace.propagation import get_current_span + + +class AttributePropagatingSpanProcessor(SpanProcessor): + """AwsAttributePropagatingSpanProcessor is SpanProcessor that propagates attributes from parent to child spans + + AwsAttributePropagatingSpanProcessor handles the propagation of attributes from parent spans to child spans, + specified in self._attribute_keys_to_propagate. AwsAttributePropagatingSpanProcessor also propagates + configurable data from parent spans to child spans, as a new attribute specified by self._propagation_data_key. + Propagated data can be configured via the self._propagation_data_extractor. + Span data propagation only starts from local root server/consumer spans, + but from there will be propagated to any descendant spans. If the span is a CONSUMER + PROCESS with the parent also a CONSUMER, it will set attribute AWS_CONSUMER_PARENT_SPAN_KIND as CONSUMER + to indicate that dependency metrics should not be generated for this span. + """ + + _propagation_data_extractor: Callable[[ReadableSpan], str] + _propagation_data_key: str + _attribute_keys_to_propagate: Tuple[str, ...] + + def __init__( + self, + propagation_data_extractor: Callable[[ReadableSpan], str], + propagation_data_key: str, + attribute_keys_to_propagate: Tuple[str, ...], + ): + self._propagation_data_extractor = propagation_data_extractor + self._propagation_data_key = propagation_data_key + self._attribute_keys_to_propagate = attribute_keys_to_propagate + + @override + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: + parent_span: ReadableSpan = get_current_span(parent_context) + if isinstance(parent_span, ReadableSpan): + # Add the AWS_SDK_DESCENDANT attribute to the immediate child spans of AWS SDK span. + # This attribute helps the backend differentiate between SDK spans and their immediate children. + # It's assumed that the HTTP spans are immediate children of the AWS SDK span + # TODO: we should have a contract test to check the immediate children are HTTP span + if is_aws_sdk_span(parent_span): + span.set_attribute(_AwsAttributeKeys.AWS_SDK_DESCENDANT, "true") + + if SpanKind.INTERNAL == parent_span.kind: + for key_to_propagate in self._attribute_keys_to_propagate: + value_to_propagate: str = parent_span.attributes.get(key_to_propagate) + if value_to_propagate is not None: + span.set_attribute(key_to_propagate, value_to_propagate) + + # We cannot guarantee that messaging.operation is set onStart, it could be set after the fact. + # To work around this, add the AWS_CONSUMER_PARENT_SPAN_KIND attribute if parent and child are both CONSUMER + # then check later if a metric should be generated. + if _is_consumer_kind(span) and _is_consumer_kind(parent_span): + span.set_attribute(_AwsAttributeKeys.AWS_CONSUMER_PARENT_SPAN_KIND, parent_span.kind.name) + + propagation_data: str = None + if is_local_root(span): + if not _is_server_kind(span): + propagation_data = self._propagation_data_extractor(span) + elif _is_server_kind(parent_span): + propagation_data = self._propagation_data_extractor(parent_span) + else: + propagation_data = parent_span.attributes.get(self._propagation_data_key) + + if propagation_data is not None: + span.set_attribute(self._propagation_data_key, propagation_data) + + # pylint: disable=no-self-use + @override + def on_end(self, span: ReadableSpan) -> None: + return + + @override + def shutdown(self) -> None: + self.force_flush() + + # pylint: disable=no-self-use + @override + def force_flush(self, timeout_millis: int = None) -> bool: + return True + + +def _is_consumer_kind(span: ReadableSpan) -> bool: + return SpanKind.CONSUMER == span.kind + + +def _is_server_kind(span: ReadableSpan) -> bool: + return SpanKind.SERVER == span.kind diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor_builder.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor_builder.py new file mode 100644 index 000000000..64e775529 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/attribute_propagating_span_processor_builder.py @@ -0,0 +1,50 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Callable, List, Tuple + +from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys +from amazon.opentelemetry.distro._aws_span_processing_util import get_ingress_operation +from amazon.opentelemetry.distro.attribute_propagating_span_processor import AttributePropagatingSpanProcessor +from opentelemetry.sdk.trace import ReadableSpan + + +class AttributePropagatingSpanProcessorBuilder: + """ + AttributePropagatingSpanProcessorBuilder is used to construct a AttributePropagatingSpanProcessor. + If set_propagation_data_extractor, set_propagation_data_key or set_attributes_keys_to_propagate are not invoked, + the builder defaults to using specific propagation targets. + """ + + _propagation_data_extractor: str = get_ingress_operation + _propagation_data_key: str = _AwsAttributeKeys.AWS_LOCAL_OPERATION + _attributes_keys_to_propagate: Tuple[str, ...] = ( + _AwsAttributeKeys.AWS_REMOTE_SERVICE, + _AwsAttributeKeys.AWS_REMOTE_OPERATION, + ) + + def set_propagation_data_extractor( + self, propagation_data_extractor: Callable[[ReadableSpan], str] + ) -> "AttributePropagatingSpanProcessorBuilder": + if propagation_data_extractor is None: + raise ValueError("propagation_data_extractor must not be None") + self._propagation_data_extractor = propagation_data_extractor + return self + + def set_propagation_data_key(self, propagation_data_key: str) -> "AttributePropagatingSpanProcessorBuilder": + if propagation_data_key is None: + raise ValueError("propagation_data_key must not be None") + self._propagation_data_key = propagation_data_key + return self + + def set_attributes_keys_to_propagate( + self, attributes_keys_to_propagate: List[str] + ) -> "AttributePropagatingSpanProcessorBuilder": + if attributes_keys_to_propagate is None: + raise ValueError("attributes_keys_to_propagate must not be None") + self._attributes_keys_to_propagate = tuple(attributes_keys_to_propagate) + return self + + def build(self) -> AttributePropagatingSpanProcessor: + return AttributePropagatingSpanProcessor( + self._propagation_data_extractor, self._propagation_data_key, self._attributes_keys_to_propagate + ) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_attribute_propagating_span_processor.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_attribute_propagating_span_processor.py new file mode 100644 index 000000000..a29cee5a4 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_attribute_propagating_span_processor.py @@ -0,0 +1,11 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from unittest import TestCase + +from amazon.opentelemetry.distro.attribute_propagating_span_processor import AttributePropagatingSpanProcessor + + +class TestAttributePropagatingSpanProcessor(TestCase): + def test_basic(self): + processor: AttributePropagatingSpanProcessor = AttributePropagatingSpanProcessor(None, None, None) + self.assertTrue(processor.force_flush) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_attribute_propagating_span_processor_builder.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_attribute_propagating_span_processor_builder.py new file mode 100644 index 000000000..dbd30969b --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_attribute_propagating_span_processor_builder.py @@ -0,0 +1,13 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from unittest import TestCase + +from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import ( + AttributePropagatingSpanProcessorBuilder, +) + + +class TestAttributePropagatingSpanProcessorBuilder(TestCase): + def test_basic(self): + builder: AttributePropagatingSpanProcessorBuilder = AttributePropagatingSpanProcessorBuilder() + self.assertIs(builder.set_propagation_data_key("test"), builder)