Skip to content

Commit

Permalink
Add AwsAttributePropagatingSpanProcessor. (#14)
Browse files Browse the repository at this point in the history
In this commit, we are implementing AwsAttributePropagatingSpanProcessor
and AwsAttributePropagatingSpanProcessorBuilder. As much as possible, we
are attempting to mirror the implementation of these class found in
https://github.com/aws-observability/aws-otel-java-instrumentation


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
zzhlogin authored Jan 12, 2024
1 parent 73b4f68 commit 6d73154
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0


class AwsAttributeKeys:
class _AwsAttributeKeys:
"""Utility class holding attribute keys with special meaning to AWS components"""

AWS_SPAN_KIND: str = "aws.span.kind"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Utility module designed to support shared logic across AWS Span Processors."""
from amazon.opentelemetry.distro._aws_attribute_keys import AwsAttributeKeys
from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys
from opentelemetry.sdk.trace import InstrumentationScope, ReadableSpan
from opentelemetry.semconv.trace import MessagingOperationValues, SpanAttributes
from opentelemetry.trace import SpanKind
Expand Down Expand Up @@ -35,7 +35,7 @@ def get_ingress_operation(__, span: ReadableSpan) -> str:
def get_egress_operation(span: ReadableSpan) -> str:
if should_use_internal_operation(span):
return INTERNAL_OPERATION
return span.attributes.get(AwsAttributeKeys.AWS_LOCAL_OPERATION)
return span.attributes.get(_AwsAttributeKeys.AWS_LOCAL_OPERATION)


def extract_api_path_value(http_target: str) -> str:
Expand Down Expand Up @@ -117,7 +117,7 @@ def _is_dependency_consumer_span(span: ReadableSpan) -> bool:
if is_consumer_process_span(span):
if is_local_root(span):
return True
parent_span_kind: str = span.attributes.get(AwsAttributeKeys.AWS_CONSUMER_PARENT_SPAN_KIND)
parent_span_kind: str = span.attributes.get(_AwsAttributeKeys.AWS_CONSUMER_PARENT_SPAN_KIND)
return SpanKind.CONSUMER != parent_span_kind

return True
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Callable, Optional, Tuple

from typing_extensions import override

from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys
from amazon.opentelemetry.distro._aws_span_processing_util import is_aws_sdk_span, is_local_root
from opentelemetry.context import Context
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.trace import SpanKind
from opentelemetry.trace.propagation import get_current_span


class AttributePropagatingSpanProcessor(SpanProcessor):
"""AwsAttributePropagatingSpanProcessor is SpanProcessor that propagates attributes from parent to child spans
AwsAttributePropagatingSpanProcessor handles the propagation of attributes from parent spans to child spans,
specified in self._attribute_keys_to_propagate. AwsAttributePropagatingSpanProcessor also propagates
configurable data from parent spans to child spans, as a new attribute specified by self._propagation_data_key.
Propagated data can be configured via the self._propagation_data_extractor.
Span data propagation only starts from local root server/consumer spans,
but from there will be propagated to any descendant spans. If the span is a CONSUMER
PROCESS with the parent also a CONSUMER, it will set attribute AWS_CONSUMER_PARENT_SPAN_KIND as CONSUMER
to indicate that dependency metrics should not be generated for this span.
"""

_propagation_data_extractor: Callable[[ReadableSpan], str]
_propagation_data_key: str
_attribute_keys_to_propagate: Tuple[str, ...]

def __init__(
self,
propagation_data_extractor: Callable[[ReadableSpan], str],
propagation_data_key: str,
attribute_keys_to_propagate: Tuple[str, ...],
):
self._propagation_data_extractor = propagation_data_extractor
self._propagation_data_key = propagation_data_key
self._attribute_keys_to_propagate = attribute_keys_to_propagate

@override
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
parent_span: ReadableSpan = get_current_span(parent_context)
if isinstance(parent_span, ReadableSpan):
# Add the AWS_SDK_DESCENDANT attribute to the immediate child spans of AWS SDK span.
# This attribute helps the backend differentiate between SDK spans and their immediate children.
# It's assumed that the HTTP spans are immediate children of the AWS SDK span
# TODO: we should have a contract test to check the immediate children are HTTP span
if is_aws_sdk_span(parent_span):
span.set_attribute(_AwsAttributeKeys.AWS_SDK_DESCENDANT, "true")

if SpanKind.INTERNAL == parent_span.kind:
for key_to_propagate in self._attribute_keys_to_propagate:
value_to_propagate: str = parent_span.attributes.get(key_to_propagate)
if value_to_propagate is not None:
span.set_attribute(key_to_propagate, value_to_propagate)

# We cannot guarantee that messaging.operation is set onStart, it could be set after the fact.
# To work around this, add the AWS_CONSUMER_PARENT_SPAN_KIND attribute if parent and child are both CONSUMER
# then check later if a metric should be generated.
if _is_consumer_kind(span) and _is_consumer_kind(parent_span):
span.set_attribute(_AwsAttributeKeys.AWS_CONSUMER_PARENT_SPAN_KIND, parent_span.kind.name)

propagation_data: str = None
if is_local_root(span):
if not _is_server_kind(span):
propagation_data = self._propagation_data_extractor(span)
elif _is_server_kind(parent_span):
propagation_data = self._propagation_data_extractor(parent_span)
else:
propagation_data = parent_span.attributes.get(self._propagation_data_key)

if propagation_data is not None:
span.set_attribute(self._propagation_data_key, propagation_data)

# pylint: disable=no-self-use
@override
def on_end(self, span: ReadableSpan) -> None:
return

@override
def shutdown(self) -> None:
self.force_flush()

# pylint: disable=no-self-use
@override
def force_flush(self, timeout_millis: int = None) -> bool:
return True


def _is_consumer_kind(span: ReadableSpan) -> bool:
return SpanKind.CONSUMER == span.kind


def _is_server_kind(span: ReadableSpan) -> bool:
return SpanKind.SERVER == span.kind
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Callable, List, Tuple

from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys
from amazon.opentelemetry.distro._aws_span_processing_util import get_ingress_operation
from amazon.opentelemetry.distro.attribute_propagating_span_processor import AttributePropagatingSpanProcessor
from opentelemetry.sdk.trace import ReadableSpan


class AttributePropagatingSpanProcessorBuilder:
"""
AttributePropagatingSpanProcessorBuilder is used to construct a AttributePropagatingSpanProcessor.
If set_propagation_data_extractor, set_propagation_data_key or set_attributes_keys_to_propagate are not invoked,
the builder defaults to using specific propagation targets.
"""

_propagation_data_extractor: str = get_ingress_operation
_propagation_data_key: str = _AwsAttributeKeys.AWS_LOCAL_OPERATION
_attributes_keys_to_propagate: Tuple[str, ...] = (
_AwsAttributeKeys.AWS_REMOTE_SERVICE,
_AwsAttributeKeys.AWS_REMOTE_OPERATION,
)

def set_propagation_data_extractor(
self, propagation_data_extractor: Callable[[ReadableSpan], str]
) -> "AttributePropagatingSpanProcessorBuilder":
if propagation_data_extractor is None:
raise ValueError("propagation_data_extractor must not be None")
self._propagation_data_extractor = propagation_data_extractor
return self

def set_propagation_data_key(self, propagation_data_key: str) -> "AttributePropagatingSpanProcessorBuilder":
if propagation_data_key is None:
raise ValueError("propagation_data_key must not be None")
self._propagation_data_key = propagation_data_key
return self

def set_attributes_keys_to_propagate(
self, attributes_keys_to_propagate: List[str]
) -> "AttributePropagatingSpanProcessorBuilder":
if attributes_keys_to_propagate is None:
raise ValueError("attributes_keys_to_propagate must not be None")
self._attributes_keys_to_propagate = tuple(attributes_keys_to_propagate)
return self

def build(self) -> AttributePropagatingSpanProcessor:
return AttributePropagatingSpanProcessor(
self._propagation_data_extractor, self._propagation_data_key, self._attributes_keys_to_propagate
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from unittest import TestCase

from amazon.opentelemetry.distro.attribute_propagating_span_processor import AttributePropagatingSpanProcessor


class TestAttributePropagatingSpanProcessor(TestCase):
def test_basic(self):
processor: AttributePropagatingSpanProcessor = AttributePropagatingSpanProcessor(None, None, None)
self.assertTrue(processor.force_flush)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from unittest import TestCase

from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
AttributePropagatingSpanProcessorBuilder,
)


class TestAttributePropagatingSpanProcessorBuilder(TestCase):
def test_basic(self):
builder: AttributePropagatingSpanProcessorBuilder = AttributePropagatingSpanProcessorBuilder()
self.assertIs(builder.set_propagation_data_key("test"), builder)

0 comments on commit 6d73154

Please sign in to comment.