Skip to content

Commit

Permalink
Merge branch 'main' into update-dockerfile-with-scratch
Browse files Browse the repository at this point in the history
  • Loading branch information
harrryr authored Jul 5, 2024
2 parents 6a777ed + 9ff3b90 commit 4488008
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@

# AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions.
# TODO:Move to Semantic Conventions when these attributes are added.
AWS_QUEUE_URL: str = "aws.sqs.queue_url"
AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
AWS_SQS_QUEUE_URL: str = "aws.sqs.queue.url"
AWS_SQS_QUEUE_NAME: str = "aws.sqs.queue.name"
AWS_KINESIS_STREAM_NAME: str = "aws.kinesis.stream.name"
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from urllib.parse import ParseResult, urlparse

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_KINESIS_STREAM_NAME,
AWS_LOCAL_OPERATION,
AWS_LOCAL_SERVICE,
AWS_QUEUE_NAME,
AWS_QUEUE_URL,
AWS_REMOTE_DB_USER,
AWS_REMOTE_OPERATION,
AWS_REMOTE_RESOURCE_IDENTIFIER,
AWS_REMOTE_RESOURCE_TYPE,
AWS_REMOTE_SERVICE,
AWS_SPAN_KIND,
AWS_STREAM_NAME,
AWS_SQS_QUEUE_NAME,
AWS_SQS_QUEUE_URL,
)
from amazon.opentelemetry.distro._aws_span_processing_util import (
LOCAL_ROOT,
Expand Down Expand Up @@ -361,19 +361,19 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
if is_key_present(span, _AWS_TABLE_NAMES) and len(span.attributes.get(_AWS_TABLE_NAMES)) == 1:
remote_resource_type = _NORMALIZED_DYNAMO_DB_SERVICE_NAME + "::Table"
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_TABLE_NAMES)[0])
elif is_key_present(span, AWS_STREAM_NAME):
elif is_key_present(span, AWS_KINESIS_STREAM_NAME):
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME))
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_KINESIS_STREAM_NAME))
elif is_key_present(span, _AWS_BUCKET_NAME):
remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket"
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME))
elif is_key_present(span, AWS_QUEUE_NAME):
elif is_key_present(span, AWS_SQS_QUEUE_NAME):
remote_resource_type = _NORMALIZED_SQS_SERVICE_NAME + "::Queue"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_QUEUE_NAME))
elif is_key_present(span, AWS_QUEUE_URL):
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_SQS_QUEUE_NAME))
elif is_key_present(span, AWS_SQS_QUEUE_URL):
remote_resource_type = _NORMALIZED_SQS_SERVICE_NAME + "::Queue"
remote_resource_identifier = _escape_delimiters(
SqsUrlParser.get_queue_name(span.attributes.get(AWS_QUEUE_URL))
SqsUrlParser.get_queue_name(span.attributes.get(AWS_SQS_QUEUE_URL))
)
elif is_db_span(span):
remote_resource_type = _DB_CONNECTION_STRING_TYPE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
import importlib

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_KINESIS_STREAM_NAME,
AWS_SQS_QUEUE_NAME,
AWS_SQS_QUEUE_URL,
)
from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS
from opentelemetry.instrumentation.botocore.extensions.sqs import _SqsExtension
from opentelemetry.instrumentation.botocore.extensions.types import _AttributeMapT, _AwsSdkExtension
Expand All @@ -24,7 +29,7 @@ def _apply_botocore_kinesis_patch() -> None:
This patch adds an extension to the upstream's list of known extension for Kinesis. Extensions allow for custom
logic for adding service-specific information to spans, such as attributes. Specifically, we are adding logic to add
the `aws.kinesis.stream_name` attribute, to be used to generate RemoteTarget and achieve parity with the Java
the `aws.kinesis.stream.name` attribute, to be used to generate RemoteTarget and achieve parity with the Java
instrumentation.
"""
_KNOWN_EXTENSIONS["kinesis"] = _lazy_load(".", "_KinesisExtension")
Expand All @@ -47,7 +52,7 @@ def _apply_botocore_sqs_patch() -> None:
This patch extends the existing upstream extension for SQS. Extensions allow for custom logic for adding
service-specific information to spans, such as attributes. Specifically, we are adding logic to add
`aws.sqs.queue_url` and `aws.sqs.queue_name` attributes, to be used to generate RemoteTarget and achieve parity
`aws.sqs.queue.url` and `aws.sqs.queue.name` attributes, to be used to generate RemoteTarget and achieve parity
with the Java instrumentation. Callout that today, the upstream logic adds `aws.queue_url` but we feel that
`aws.sqs` is more in line with existing AWS Semantic Convention attributes like `AWS_S3_BUCKET`, etc.
"""
Expand All @@ -58,9 +63,9 @@ def patch_extract_attributes(self, attributes: _AttributeMapT):
queue_name = self._call_context.params.get("QueueName")
queue_url = self._call_context.params.get("QueueUrl")
if queue_name:
attributes["aws.sqs.queue_name"] = queue_name
attributes[AWS_SQS_QUEUE_NAME] = queue_name
if queue_url:
attributes["aws.sqs.queue_url"] = queue_url
attributes[AWS_SQS_QUEUE_URL] = queue_url

_SqsExtension.extract_attributes = patch_extract_attributes

Expand Down Expand Up @@ -93,4 +98,4 @@ class _KinesisExtension(_AwsSdkExtension):
def extract_attributes(self, attributes: _AttributeMapT):
stream_name = self._call_context.params.get("StreamName")
if stream_name:
attributes["aws.kinesis.stream_name"] = stream_name
attributes[AWS_KINESIS_STREAM_NAME] = stream_name
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_CONSUMER_PARENT_SPAN_KIND,
AWS_KINESIS_STREAM_NAME,
AWS_LOCAL_OPERATION,
AWS_LOCAL_SERVICE,
AWS_QUEUE_NAME,
AWS_QUEUE_URL,
AWS_REMOTE_DB_USER,
AWS_REMOTE_OPERATION,
AWS_REMOTE_RESOURCE_IDENTIFIER,
AWS_REMOTE_RESOURCE_TYPE,
AWS_REMOTE_SERVICE,
AWS_SPAN_KIND,
AWS_STREAM_NAME,
AWS_SQS_QUEUE_NAME,
AWS_SQS_QUEUE_URL,
)
from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator
from amazon.opentelemetry.distro.metric_attribute_generator import DEPENDENCY_METRIC, SERVICE_METRIC
Expand Down Expand Up @@ -971,31 +971,31 @@ def test_sdk_client_span_with_remote_resource_attributes(self):
self._validate_remote_resource_attributes("AWS::S3::Bucket", "aws_s3_bucket_name")
self._mock_attribute([SpanAttributes.AWS_S3_BUCKET], [None])

# Validate behaviour of AWS_QUEUE_NAME attribute, then remove it
self._mock_attribute([AWS_QUEUE_NAME], ["aws_queue_name"], keys, values)
# Validate behaviour of AWS_SQS_QUEUE_NAME attribute, then remove it
self._mock_attribute([AWS_SQS_QUEUE_NAME], ["aws_queue_name"], keys, values)
self._validate_remote_resource_attributes("AWS::SQS::Queue", "aws_queue_name")
self._mock_attribute([AWS_QUEUE_NAME], [None])
self._mock_attribute([AWS_SQS_QUEUE_NAME], [None])

# Validate behaviour of having both AWS_QUEUE_NAME and AWS_QUEUE_URL attribute, then remove them. Queue name is
# more reliable than queue URL, so we prefer to use name over URL.
# Validate behaviour of having both AWS_SQS_QUEUE_NAME and AWS_SQS_QUEUE_URL attribute, then remove them.
# Queue name is more reliable than queue URL, so we prefer to use name over URL.
self._mock_attribute(
[AWS_QUEUE_URL, AWS_QUEUE_NAME],
[AWS_SQS_QUEUE_URL, AWS_SQS_QUEUE_NAME],
["https://sqs.us-east-2.amazonaws.com/123456789012/Queue", "aws_queue_name"],
keys,
values,
)
self._validate_remote_resource_attributes("AWS::SQS::Queue", "aws_queue_name")
self._mock_attribute([AWS_QUEUE_URL, AWS_QUEUE_NAME], [None, None])
self._mock_attribute([AWS_SQS_QUEUE_URL, AWS_SQS_QUEUE_NAME], [None, None])

# Valid queue name with invalid queue URL, we should default to using the queue name.
self._mock_attribute([AWS_QUEUE_URL, AWS_QUEUE_NAME], ["invalidUrl", "aws_queue_name"], keys, values)
self._mock_attribute([AWS_SQS_QUEUE_URL, AWS_SQS_QUEUE_NAME], ["invalidUrl", "aws_queue_name"], keys, values)
self._validate_remote_resource_attributes("AWS::SQS::Queue", "aws_queue_name")
self._mock_attribute([AWS_QUEUE_URL, AWS_QUEUE_NAME], [None, None])
self._mock_attribute([AWS_SQS_QUEUE_URL, AWS_SQS_QUEUE_NAME], [None, None])

# Validate behaviour of AWS_STREAM_NAME attribute, then remove it.
self._mock_attribute([AWS_STREAM_NAME], ["aws_stream_name"], keys, values)
# Validate behaviour of AWS_KINESIS_STREAM_NAME attribute, then remove it.
self._mock_attribute([AWS_KINESIS_STREAM_NAME], ["aws_stream_name"], keys, values)
self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name")
self._mock_attribute([AWS_STREAM_NAME], [None])
self._mock_attribute([AWS_KINESIS_STREAM_NAME], [None])

# Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it.
self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ def _test_unpatched_botocore_instrumentation(self):
self.assertTrue("sqs" in _KNOWN_EXTENSIONS, "Upstream has removed the SQS extension")
attributes: Dict[str, str] = _do_extract_sqs_attributes()
self.assertTrue("aws.queue_url" in attributes)
self.assertFalse("aws.sqs.queue_url" in attributes)
self.assertFalse("aws.sqs.queue_name" in attributes)
self.assertFalse("aws.sqs.queue.url" in attributes)
self.assertFalse("aws.sqs.queue.name" in attributes)

def _test_patched_botocore_instrumentation(self):
# Kinesis
self.assertTrue("kinesis" in _KNOWN_EXTENSIONS)
kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes()
self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes)
self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME)
self.assertTrue("aws.kinesis.stream.name" in kinesis_attributes)
self.assertEqual(kinesis_attributes["aws.kinesis.stream.name"], _STREAM_NAME)

# S3
self.assertTrue("s3" in _KNOWN_EXTENSIONS)
Expand All @@ -110,10 +110,10 @@ def _test_patched_botocore_instrumentation(self):
self.assertTrue("sqs" in _KNOWN_EXTENSIONS)
sqs_attributes: Dict[str, str] = _do_extract_sqs_attributes()
self.assertTrue("aws.queue_url" in sqs_attributes)
self.assertTrue("aws.sqs.queue_url" in sqs_attributes)
self.assertEqual(sqs_attributes["aws.sqs.queue_url"], _QUEUE_URL)
self.assertTrue("aws.sqs.queue_name" in sqs_attributes)
self.assertEqual(sqs_attributes["aws.sqs.queue_name"], _QUEUE_NAME)
self.assertTrue("aws.sqs.queue.url" in sqs_attributes)
self.assertEqual(sqs_attributes["aws.sqs.queue.url"], _QUEUE_URL)
self.assertTrue("aws.sqs.queue.name" in sqs_attributes)
self.assertEqual(sqs_attributes["aws.sqs.queue.name"], _QUEUE_NAME)

def _test_botocore_installed_flag(self):
with patch(
Expand Down
22 changes: 11 additions & 11 deletions contract-tests/tests/test/amazon/botocore/botocore_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
_logger: Logger = getLogger(__name__)
_logger.setLevel(INFO)

_AWS_QUEUE_URL: str = "aws.sqs.queue_url"
_AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
_AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
_AWS_SQS_QUEUE_URL: str = "aws.sqs.queue.url"
_AWS_SQS_QUEUE_NAME: str = "aws.sqs.queue.name"
_AWS_KINESIS_STREAM_NAME: str = "aws.kinesis.stream.name"


# pylint: disable=too-many-public-methods
Expand Down Expand Up @@ -248,7 +248,7 @@ def test_sqs_create_queue(self):
remote_resource_type="AWS::SQS::Queue",
remote_resource_identifier="test_queue",
request_specific_attributes={
_AWS_QUEUE_NAME: "test_queue",
_AWS_SQS_QUEUE_NAME: "test_queue",
},
span_name="SQS.CreateQueue",
)
Expand All @@ -265,7 +265,7 @@ def test_sqs_send_message(self):
remote_resource_type="AWS::SQS::Queue",
remote_resource_identifier="test_put_get_queue",
request_specific_attributes={
_AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue",
_AWS_SQS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue",
},
span_name="SQS.SendMessage",
)
Expand All @@ -282,7 +282,7 @@ def test_sqs_receive_message(self):
remote_resource_type="AWS::SQS::Queue",
remote_resource_identifier="test_put_get_queue",
request_specific_attributes={
_AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue",
_AWS_SQS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue",
},
span_name="SQS.ReceiveMessage",
)
Expand All @@ -299,7 +299,7 @@ def test_sqs_error(self):
remote_resource_type="AWS::SQS::Queue",
remote_resource_identifier="sqserror",
request_specific_attributes={
_AWS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror",
_AWS_SQS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror",
},
span_name="SQS.SendMessage",
)
Expand All @@ -316,7 +316,7 @@ def test_sqs_fault(self):
remote_resource_type="AWS::SQS::Queue",
remote_resource_identifier="invalid_test",
request_specific_attributes={
_AWS_QUEUE_NAME: "invalid_test",
_AWS_SQS_QUEUE_NAME: "invalid_test",
},
span_name="SQS.CreateQueue",
)
Expand All @@ -333,7 +333,7 @@ def test_kinesis_put_record(self):
remote_resource_type="AWS::Kinesis::Stream",
remote_resource_identifier="test_stream",
request_specific_attributes={
_AWS_STREAM_NAME: "test_stream",
_AWS_KINESIS_STREAM_NAME: "test_stream",
},
span_name="Kinesis.PutRecord",
)
Expand All @@ -350,7 +350,7 @@ def test_kinesis_error(self):
remote_resource_type="AWS::Kinesis::Stream",
remote_resource_identifier="invalid_stream",
request_specific_attributes={
_AWS_STREAM_NAME: "invalid_stream",
_AWS_KINESIS_STREAM_NAME: "invalid_stream",
},
span_name="Kinesis.PutRecord",
)
Expand All @@ -367,7 +367,7 @@ def test_kinesis_fault(self):
remote_resource_type="AWS::Kinesis::Stream",
remote_resource_identifier="test_stream",
request_specific_attributes={
_AWS_STREAM_NAME: "test_stream",
_AWS_KINESIS_STREAM_NAME: "test_stream",
},
span_name="Kinesis.PutRecord",
)
Expand Down

0 comments on commit 4488008

Please sign in to comment.