Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AwsAttributePropagatingSpanProcessor. #14

Merged
merged 9 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0


class AwsAttributeKeys:
class _AwsAttributeKeys:
"""Utility class holding attribute keys with special meaning to AWS components"""

AWS_SPAN_KIND: str = "aws.span.kind"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Utility module designed to support shared logic across AWS Span Processors."""
from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys
from opentelemetry.sdk.trace import InstrumentationScope, ReadableSpan
from opentelemetry.semconv.trace import MessagingOperationValues, SpanAttributes
from opentelemetry.trace import SpanKind

# Default attribute values if no valid span attribute value is identified
UNKNOWN_SERVICE: str = "UnknownService"
UNKNOWN_OPERATION: str = "UnknownOperation"
UNKNOWN_REMOTE_SERVICE: str = "UnknownRemoteService"
UNKNOWN_REMOTE_OPERATION: str = "UnknownRemoteOperation"
INTERNAL_OPERATION: str = "InternalOperation"
LOCAL_ROOT: str = "LOCAL_ROOT"

# Useful constants
_SQS_RECEIVE_MESSAGE_SPAN_NAME: str = "Sqs.ReceiveMessage"
_AWS_SDK_INSTRUMENTATION_SCOPE_PREFIX: str = "io.opentelemetry.aws-sdk-"


def get_ingress_operation(span: ReadableSpan) -> str:
thpierce marked this conversation as resolved.
Show resolved Hide resolved
"""
Ingress operation (i.e. operation for Server and Consumer spans) will be generated from "http.method + http.target/
with the first API path parameter" if the default span name is None, UnknownOperation or http.method value.
"""
operation: str = span.name
if should_use_internal_operation(span):
operation = INTERNAL_OPERATION
elif not _is_valid_operation(span, operation):
operation = _generate_ingress_operation(span)
return operation


def get_egress_operation(span: ReadableSpan) -> str:
if should_use_internal_operation(span):
return INTERNAL_OPERATION
return span.attributes.get(_AwsAttributeKeys.AWS_LOCAL_OPERATION)


def extract_api_path_value(http_target: str) -> str:
"""Extract the first part from API http target if it exists

Args
http_target - http request target string value. Eg, /payment/1234
Returns
the first part from the http target. Eg, /payment
:return:
"""
if http_target is None or len(http_target) == 0:
return "/"
paths: [str] = http_target.split("/")
if len(paths) > 1:
return "/" + paths[1]
return "/"


def is_key_present(span: ReadableSpan, key: str) -> bool:
return span.attributes.get(key) is not None


def is_aws_sdk_span(span: ReadableSpan) -> bool:
# https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/instrumentation/aws-sdk/#common-attributes
return "aws-api" == span.attributes.get(SpanAttributes.RPC_SYSTEM)


def should_generate_service_metric_attributes(span: ReadableSpan) -> bool:
return (is_local_root(span) and not _is_sqs_receive_message_consumer_span(span)) or SpanKind.SERVER == span.kind


def should_generate_dependency_metric_attributes(span: ReadableSpan) -> bool:
return (
SpanKind.CLIENT == span.kind
or SpanKind.PRODUCER == span.kind
or (_is_dependency_consumer_span(span) and not _is_sqs_receive_message_consumer_span(span))
)


def is_consumer_process_span(span: ReadableSpan) -> bool:
messaging_operation: str = span.attributes.get(SpanAttributes.MESSAGING_OPERATION)
return SpanKind.CONSUMER == span.kind and MessagingOperationValues.PROCESS == messaging_operation


def should_use_internal_operation(span: ReadableSpan) -> bool:
"""
Any spans that are Local Roots and also not SERVER should have aws.local.operation renamed toInternalOperation.
"""
return is_local_root(span) and not SpanKind.SERVER == span.kind


def is_local_root(span: ReadableSpan) -> bool:
"""
A span is a local root if it has no parent or if the parent is remote. This function checks the parent context
and returns true if it is a local root.
"""
return span.parent is None or not span.parent.is_valid or span.parent.is_remote


def _is_sqs_receive_message_consumer_span(span: ReadableSpan) -> bool:
"""To identify the SQS consumer spans produced by AWS SDK instrumentation"""
messaging_operation: str = span.attributes.get(SpanAttributes.MESSAGING_OPERATION)
instrumentation_scope: InstrumentationScope = span.instrumentation_scope

return (
_SQS_RECEIVE_MESSAGE_SPAN_NAME.casefold() == span.name.casefold()
and SpanKind.CONSUMER == span.kind
and instrumentation_scope is not None
and instrumentation_scope.name.startswith(_AWS_SDK_INSTRUMENTATION_SCOPE_PREFIX)
and (messaging_operation is None or messaging_operation == MessagingOperationValues.PROCESS)
)


def _is_dependency_consumer_span(span: ReadableSpan) -> bool:
if SpanKind.CONSUMER != span.kind:
return False

if is_consumer_process_span(span):
if is_local_root(span):
return True
parent_span_kind: str = span.attributes.get(_AwsAttributeKeys.AWS_CONSUMER_PARENT_SPAN_KIND)
return SpanKind.CONSUMER != parent_span_kind

return True


def _is_valid_operation(span: ReadableSpan, operation: str) -> bool:
"""
When Span name is null, UnknownOperation or HttpMethod value, it will be treated as invalid local operation value
that needs to be further processed
"""
if operation is None or operation == UNKNOWN_OPERATION:
return False

if is_key_present(span, SpanAttributes.HTTP_METHOD):
http_method: str = span.attributes.get(SpanAttributes.HTTP_METHOD)
return operation != http_method

return True


def _generate_ingress_operation(span: ReadableSpan) -> str:
"""
When span name is not meaningful(null, unknown or http_method value) as operation name for http use cases. Will try
to extract the operation name from http target string
"""
operation: str = UNKNOWN_OPERATION
if is_key_present(span, SpanAttributes.HTTP_TARGET):
http_target: str = span.attributes.get(SpanAttributes.HTTP_TARGET)
# get the first part from API path string as operation value
# the more levels/parts we get from API path the higher chance for getting high cardinality data
if http_target is not None:
operation = extract_api_path_value(http_target)
if is_key_present(span, SpanAttributes.HTTP_METHOD):
http_method: str = span.attributes.get(SpanAttributes.HTTP_METHOD)
if http_method is not None:
operation = http_method + " " + operation

return operation
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Callable, List, Optional

from typing_extensions import override

from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys
from amazon.opentelemetry.distro._aws_span_processing_util import is_aws_sdk_span, is_local_root
from opentelemetry.context import Context
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.trace import SpanKind
from opentelemetry.trace.propagation import get_current_span


class AttributePropagatingSpanProcessor(SpanProcessor):
"""AwsAttributePropagatingSpanProcessor is SpanProcessor that propagates attributes from parent to child spans

AwsAttributePropagatingSpanProcessor handles the propagation of attributes from parent spans to child spans,
specified in self._attribute_keys_to_propagate. AwsAttributePropagatingSpanProcessor also propagates
configurable data from parent spans to child spans, as a new attribute specified by self._propagation_data_key.
Propagated data can be configured via the self._propagation_data_extractor.
Span data propagation only starts from local root server/consumer spans,
but from there will be propagated to any descendant spans. If the span is a CONSUMER
PROCESS with the parent also a CONSUMER, it will set attribute AWS_CONSUMER_PARENT_SPAN_KIND as CONSUMER
to indicate that dependency metrics should not be generated for this span.
"""

_propagation_data_extractor: Callable[[ReadableSpan], str]
_propagation_data_key: str
_attribute_keys_to_propagate: List[str]

def __init__(
self,
propagation_data_extractor: Callable[[ReadableSpan], str],
propagation_data_key: str,
attribute_keys_to_propagate: List[str],
):
self._propagation_data_extractor = propagation_data_extractor
self._propagation_data_key = propagation_data_key
self._attribute_keys_to_propagate = attribute_keys_to_propagate

@override
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
parent_span: ReadableSpan = get_current_span(parent_context)
if isinstance(parent_span, ReadableSpan):
# Add the AWS_SDK_DESCENDANT attribute to the immediate child spans of AWS SDK span.
# This attribute helps the backend differentiate between SDK spans and their immediate children.
# It's assumed that the HTTP spans are immediate children of the AWS SDK span
# TODO: we should have a contract test to check the immediate children are HTTP span
if is_aws_sdk_span(parent_span):
span.set_attribute(_AwsAttributeKeys.AWS_SDK_DESCENDANT, "true")

if SpanKind.INTERNAL == parent_span.kind:
for key_to_propagate in self._attribute_keys_to_propagate:
value_to_propagate: str = parent_span.attributes.get(key_to_propagate)
if value_to_propagate is not None:
span.set_attribute(key_to_propagate, value_to_propagate)

# We cannot guarantee that messaging.operation is set onStart, it could be set after the fact.
# To work around this, add the AWS_CONSUMER_PARENT_SPAN_KIND attribute if parent and child are both CONSUMER
# then check later if a metric should be generated.
if _is_consumer_kind(span) and _is_consumer_kind(parent_span):
span.set_attribute(_AwsAttributeKeys.AWS_CONSUMER_PARENT_SPAN_KIND, parent_span.kind.name)

propagation_data: str = None
if is_local_root(span):
if not _is_server_kind(span):
propagation_data = self._propagation_data_extractor(span)
elif _is_server_kind(parent_span):
propagation_data = self._propagation_data_extractor(parent_span)
else:
propagation_data = parent_span.attributes.get(self._propagation_data_key)

if propagation_data is not None:
span.set_attribute(self._propagation_data_key, propagation_data)

# pylint: disable=no-self-use
@override
def on_end(self, span: ReadableSpan) -> None:
return

@override
def shutdown(self) -> None:
self.force_flush()

# pylint: disable=no-self-use
@override
def force_flush(self, timeout_millis: int = None) -> bool:
return True


def _is_consumer_kind(span: ReadableSpan) -> bool:
return SpanKind.CONSUMER == span.kind


def _is_server_kind(span: ReadableSpan) -> bool:
return SpanKind.SERVER == span.kind
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Callable, List

from amazon.opentelemetry.distro._aws_attribute_keys import _AwsAttributeKeys
from amazon.opentelemetry.distro._aws_span_processing_util import get_ingress_operation
from amazon.opentelemetry.distro.attribute_propagating_span_processor import AttributePropagatingSpanProcessor
from opentelemetry.sdk.trace import ReadableSpan


class AttributePropagatingSpanProcessorBuilder:
"""
AttributePropagatingSpanProcessorBuilder is used to construct a AttributePropagatingSpanProcessor.
If set_propagation_data_extractor, set_propagation_data_key or set_attributes_keys_to_propagate are not invoked,
the builder defaults to using specific propagation targets.
"""

_propagation_data_extractor: str = get_ingress_operation
_propagation_data_key: str = _AwsAttributeKeys.AWS_LOCAL_OPERATION
_attributes_keys_to_propagate: List[str] = [
_AwsAttributeKeys.AWS_REMOTE_SERVICE,
_AwsAttributeKeys.AWS_REMOTE_OPERATION,
]
thpierce marked this conversation as resolved.
Show resolved Hide resolved

def set_propagation_data_extractor(
self, propagation_data_extractor: Callable[[ReadableSpan], str]
) -> "AttributePropagatingSpanProcessorBuilder":
if propagation_data_extractor is None:
raise ValueError("propagation_data_extractor must not be None")
self._propagation_data_extractor = propagation_data_extractor
return self

def set_propagation_data_key(self, propagation_data_key: str) -> "AttributePropagatingSpanProcessorBuilder":
if propagation_data_key is None:
raise ValueError("propagation_data_key must not be None")
self._propagation_data_key = propagation_data_key
return self

def set_attributes_keys_to_propagate(
self, attributes_keys_to_propagate: List[str]
) -> "AttributePropagatingSpanProcessorBuilder":
if attributes_keys_to_propagate is None:
raise ValueError("attributes_keys_to_propagate must not be None")
self._attributes_keys_to_propagate = tuple(attributes_keys_to_propagate)
return self

def build(self) -> AttributePropagatingSpanProcessor:
return AttributePropagatingSpanProcessor(
self._propagation_data_extractor, self._propagation_data_key, self._attributes_keys_to_propagate
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from unittest import TestCase

from amazon.opentelemetry.distro.attribute_propagating_span_processor import AttributePropagatingSpanProcessor


class TestAttributePropagatingSpanProcessor(TestCase):
def test_basic(self):
processor: AttributePropagatingSpanProcessor = AttributePropagatingSpanProcessor(None, None, None)
self.assertTrue(processor.force_flush)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from unittest import TestCase

from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
AttributePropagatingSpanProcessorBuilder,
)


class TestAttributePropagatingSpanProcessorBuilder(TestCase):
def test_basic(self):
builder: AttributePropagatingSpanProcessorBuilder = AttributePropagatingSpanProcessorBuilder()
self.assertIs(builder.set_propagation_data_key("test"), builder)
Loading