From 9ff3b90b2ad6443ba55ebf5d358d7ab808f62243 Mon Sep 17 00:00:00 2001 From: Prashant Srivastava <50466688+srprash@users.noreply.github.com> Date: Fri, 5 Jul 2024 15:10:54 -0700 Subject: [PATCH] replace "_" with "." in aws attributes for SQS and Kinesis (#223) `aws.sqs.queue_url` -> `aws.sqs.queue.url` `aws.sqs.queue_name` -> `aws.sqs.queue.name` `aws.kinesis.stream_name` -> `aws.kinesis.stream.name` The above attributes are being changed to treat the `queue` and `stream` as their own namespaces. **NOTE:** checked that Application Signals test framework is NOT using these attributes: https://github.com/search?q=repo%3Aaws-observability%2Faws-application-signals-test-framework+aws.sqs.queue_url&type=code By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Co-authored-by: Thomas Pierce --- .../distro/_aws_attribute_keys.py | 6 ++-- .../distro/_aws_metric_attribute_generator.py | 18 +++++------ .../distro/patches/_botocore_patches.py | 15 ++++++---- .../test_aws_metric_attribute_generator.py | 30 +++++++++---------- .../distro/test_instrumentation_patch.py | 16 +++++----- .../test/amazon/botocore/botocore_test.py | 22 +++++++------- 6 files changed, 56 insertions(+), 51 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py index bd7e195ef..f7208485b 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py @@ -14,6 +14,6 @@ # AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions. # TODOļ¼šMove to Semantic Conventions when these attributes are added. -AWS_QUEUE_URL: str = "aws.sqs.queue_url" -AWS_QUEUE_NAME: str = "aws.sqs.queue_name" -AWS_STREAM_NAME: str = "aws.kinesis.stream_name" +AWS_SQS_QUEUE_URL: str = "aws.sqs.queue.url" +AWS_SQS_QUEUE_NAME: str = "aws.sqs.queue.name" +AWS_KINESIS_STREAM_NAME: str = "aws.kinesis.stream.name" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py index 060d902a0..9f7e63c7d 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py @@ -6,17 +6,17 @@ from urllib.parse import ParseResult, urlparse from amazon.opentelemetry.distro._aws_attribute_keys import ( + AWS_KINESIS_STREAM_NAME, AWS_LOCAL_OPERATION, AWS_LOCAL_SERVICE, - AWS_QUEUE_NAME, - AWS_QUEUE_URL, AWS_REMOTE_DB_USER, AWS_REMOTE_OPERATION, AWS_REMOTE_RESOURCE_IDENTIFIER, AWS_REMOTE_RESOURCE_TYPE, AWS_REMOTE_SERVICE, AWS_SPAN_KIND, - AWS_STREAM_NAME, + AWS_SQS_QUEUE_NAME, + AWS_SQS_QUEUE_URL, ) from amazon.opentelemetry.distro._aws_span_processing_util import ( LOCAL_ROOT, @@ -361,19 +361,19 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri if is_key_present(span, _AWS_TABLE_NAMES) and len(span.attributes.get(_AWS_TABLE_NAMES)) == 1: remote_resource_type = _NORMALIZED_DYNAMO_DB_SERVICE_NAME + "::Table" remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_TABLE_NAMES)[0]) - elif is_key_present(span, AWS_STREAM_NAME): + elif is_key_present(span, AWS_KINESIS_STREAM_NAME): remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream" - remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME)) + remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_KINESIS_STREAM_NAME)) elif is_key_present(span, _AWS_BUCKET_NAME): remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket" remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME)) - elif is_key_present(span, AWS_QUEUE_NAME): + elif is_key_present(span, AWS_SQS_QUEUE_NAME): remote_resource_type = _NORMALIZED_SQS_SERVICE_NAME + "::Queue" - remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_QUEUE_NAME)) - elif is_key_present(span, AWS_QUEUE_URL): + remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_SQS_QUEUE_NAME)) + elif is_key_present(span, AWS_SQS_QUEUE_URL): remote_resource_type = _NORMALIZED_SQS_SERVICE_NAME + "::Queue" remote_resource_identifier = _escape_delimiters( - SqsUrlParser.get_queue_name(span.attributes.get(AWS_QUEUE_URL)) + SqsUrlParser.get_queue_name(span.attributes.get(AWS_SQS_QUEUE_URL)) ) elif is_db_span(span): remote_resource_type = _DB_CONNECTION_STRING_TYPE diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index cf73fb345..953637288 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -3,6 +3,11 @@ # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. import importlib +from amazon.opentelemetry.distro._aws_attribute_keys import ( + AWS_KINESIS_STREAM_NAME, + AWS_SQS_QUEUE_NAME, + AWS_SQS_QUEUE_URL, +) from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS from opentelemetry.instrumentation.botocore.extensions.sqs import _SqsExtension from opentelemetry.instrumentation.botocore.extensions.types import _AttributeMapT, _AwsSdkExtension @@ -24,7 +29,7 @@ def _apply_botocore_kinesis_patch() -> None: This patch adds an extension to the upstream's list of known extension for Kinesis. Extensions allow for custom logic for adding service-specific information to spans, such as attributes. Specifically, we are adding logic to add - the `aws.kinesis.stream_name` attribute, to be used to generate RemoteTarget and achieve parity with the Java + the `aws.kinesis.stream.name` attribute, to be used to generate RemoteTarget and achieve parity with the Java instrumentation. """ _KNOWN_EXTENSIONS["kinesis"] = _lazy_load(".", "_KinesisExtension") @@ -47,7 +52,7 @@ def _apply_botocore_sqs_patch() -> None: This patch extends the existing upstream extension for SQS. Extensions allow for custom logic for adding service-specific information to spans, such as attributes. Specifically, we are adding logic to add - `aws.sqs.queue_url` and `aws.sqs.queue_name` attributes, to be used to generate RemoteTarget and achieve parity + `aws.sqs.queue.url` and `aws.sqs.queue.name` attributes, to be used to generate RemoteTarget and achieve parity with the Java instrumentation. Callout that today, the upstream logic adds `aws.queue_url` but we feel that `aws.sqs` is more in line with existing AWS Semantic Convention attributes like `AWS_S3_BUCKET`, etc. """ @@ -58,9 +63,9 @@ def patch_extract_attributes(self, attributes: _AttributeMapT): queue_name = self._call_context.params.get("QueueName") queue_url = self._call_context.params.get("QueueUrl") if queue_name: - attributes["aws.sqs.queue_name"] = queue_name + attributes[AWS_SQS_QUEUE_NAME] = queue_name if queue_url: - attributes["aws.sqs.queue_url"] = queue_url + attributes[AWS_SQS_QUEUE_URL] = queue_url _SqsExtension.extract_attributes = patch_extract_attributes @@ -93,4 +98,4 @@ class _KinesisExtension(_AwsSdkExtension): def extract_attributes(self, attributes: _AttributeMapT): stream_name = self._call_context.params.get("StreamName") if stream_name: - attributes["aws.kinesis.stream_name"] = stream_name + attributes[AWS_KINESIS_STREAM_NAME] = stream_name diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py index e14d30be7..bc2dd0cae 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py @@ -9,17 +9,17 @@ from amazon.opentelemetry.distro._aws_attribute_keys import ( AWS_CONSUMER_PARENT_SPAN_KIND, + AWS_KINESIS_STREAM_NAME, AWS_LOCAL_OPERATION, AWS_LOCAL_SERVICE, - AWS_QUEUE_NAME, - AWS_QUEUE_URL, AWS_REMOTE_DB_USER, AWS_REMOTE_OPERATION, AWS_REMOTE_RESOURCE_IDENTIFIER, AWS_REMOTE_RESOURCE_TYPE, AWS_REMOTE_SERVICE, AWS_SPAN_KIND, - AWS_STREAM_NAME, + AWS_SQS_QUEUE_NAME, + AWS_SQS_QUEUE_URL, ) from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator from amazon.opentelemetry.distro.metric_attribute_generator import DEPENDENCY_METRIC, SERVICE_METRIC @@ -971,31 +971,31 @@ def test_sdk_client_span_with_remote_resource_attributes(self): self._validate_remote_resource_attributes("AWS::S3::Bucket", "aws_s3_bucket_name") self._mock_attribute([SpanAttributes.AWS_S3_BUCKET], [None]) - # Validate behaviour of AWS_QUEUE_NAME attribute, then remove it - self._mock_attribute([AWS_QUEUE_NAME], ["aws_queue_name"], keys, values) + # Validate behaviour of AWS_SQS_QUEUE_NAME attribute, then remove it + self._mock_attribute([AWS_SQS_QUEUE_NAME], ["aws_queue_name"], keys, values) self._validate_remote_resource_attributes("AWS::SQS::Queue", "aws_queue_name") - self._mock_attribute([AWS_QUEUE_NAME], [None]) + self._mock_attribute([AWS_SQS_QUEUE_NAME], [None]) - # Validate behaviour of having both AWS_QUEUE_NAME and AWS_QUEUE_URL attribute, then remove them. Queue name is - # more reliable than queue URL, so we prefer to use name over URL. + # Validate behaviour of having both AWS_SQS_QUEUE_NAME and AWS_SQS_QUEUE_URL attribute, then remove them. + # Queue name is more reliable than queue URL, so we prefer to use name over URL. self._mock_attribute( - [AWS_QUEUE_URL, AWS_QUEUE_NAME], + [AWS_SQS_QUEUE_URL, AWS_SQS_QUEUE_NAME], ["https://sqs.us-east-2.amazonaws.com/123456789012/Queue", "aws_queue_name"], keys, values, ) self._validate_remote_resource_attributes("AWS::SQS::Queue", "aws_queue_name") - self._mock_attribute([AWS_QUEUE_URL, AWS_QUEUE_NAME], [None, None]) + self._mock_attribute([AWS_SQS_QUEUE_URL, AWS_SQS_QUEUE_NAME], [None, None]) # Valid queue name with invalid queue URL, we should default to using the queue name. - self._mock_attribute([AWS_QUEUE_URL, AWS_QUEUE_NAME], ["invalidUrl", "aws_queue_name"], keys, values) + self._mock_attribute([AWS_SQS_QUEUE_URL, AWS_SQS_QUEUE_NAME], ["invalidUrl", "aws_queue_name"], keys, values) self._validate_remote_resource_attributes("AWS::SQS::Queue", "aws_queue_name") - self._mock_attribute([AWS_QUEUE_URL, AWS_QUEUE_NAME], [None, None]) + self._mock_attribute([AWS_SQS_QUEUE_URL, AWS_SQS_QUEUE_NAME], [None, None]) - # Validate behaviour of AWS_STREAM_NAME attribute, then remove it. - self._mock_attribute([AWS_STREAM_NAME], ["aws_stream_name"], keys, values) + # Validate behaviour of AWS_KINESIS_STREAM_NAME attribute, then remove it. + self._mock_attribute([AWS_KINESIS_STREAM_NAME], ["aws_stream_name"], keys, values) self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name") - self._mock_attribute([AWS_STREAM_NAME], [None]) + self._mock_attribute([AWS_KINESIS_STREAM_NAME], [None]) # Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it. self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index 124abfad9..e9d8349cb 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -90,15 +90,15 @@ def _test_unpatched_botocore_instrumentation(self): self.assertTrue("sqs" in _KNOWN_EXTENSIONS, "Upstream has removed the SQS extension") attributes: Dict[str, str] = _do_extract_sqs_attributes() self.assertTrue("aws.queue_url" in attributes) - self.assertFalse("aws.sqs.queue_url" in attributes) - self.assertFalse("aws.sqs.queue_name" in attributes) + self.assertFalse("aws.sqs.queue.url" in attributes) + self.assertFalse("aws.sqs.queue.name" in attributes) def _test_patched_botocore_instrumentation(self): # Kinesis self.assertTrue("kinesis" in _KNOWN_EXTENSIONS) kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes() - self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes) - self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME) + self.assertTrue("aws.kinesis.stream.name" in kinesis_attributes) + self.assertEqual(kinesis_attributes["aws.kinesis.stream.name"], _STREAM_NAME) # S3 self.assertTrue("s3" in _KNOWN_EXTENSIONS) @@ -110,10 +110,10 @@ def _test_patched_botocore_instrumentation(self): self.assertTrue("sqs" in _KNOWN_EXTENSIONS) sqs_attributes: Dict[str, str] = _do_extract_sqs_attributes() self.assertTrue("aws.queue_url" in sqs_attributes) - self.assertTrue("aws.sqs.queue_url" in sqs_attributes) - self.assertEqual(sqs_attributes["aws.sqs.queue_url"], _QUEUE_URL) - self.assertTrue("aws.sqs.queue_name" in sqs_attributes) - self.assertEqual(sqs_attributes["aws.sqs.queue_name"], _QUEUE_NAME) + self.assertTrue("aws.sqs.queue.url" in sqs_attributes) + self.assertEqual(sqs_attributes["aws.sqs.queue.url"], _QUEUE_URL) + self.assertTrue("aws.sqs.queue.name" in sqs_attributes) + self.assertEqual(sqs_attributes["aws.sqs.queue.name"], _QUEUE_NAME) def _test_botocore_installed_flag(self): with patch( diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index 6fe278d3b..af2a5f9ae 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -26,9 +26,9 @@ _logger: Logger = getLogger(__name__) _logger.setLevel(INFO) -_AWS_QUEUE_URL: str = "aws.sqs.queue_url" -_AWS_QUEUE_NAME: str = "aws.sqs.queue_name" -_AWS_STREAM_NAME: str = "aws.kinesis.stream_name" +_AWS_SQS_QUEUE_URL: str = "aws.sqs.queue.url" +_AWS_SQS_QUEUE_NAME: str = "aws.sqs.queue.name" +_AWS_KINESIS_STREAM_NAME: str = "aws.kinesis.stream.name" # pylint: disable=too-many-public-methods @@ -248,7 +248,7 @@ def test_sqs_create_queue(self): remote_resource_type="AWS::SQS::Queue", remote_resource_identifier="test_queue", request_specific_attributes={ - _AWS_QUEUE_NAME: "test_queue", + _AWS_SQS_QUEUE_NAME: "test_queue", }, span_name="SQS.CreateQueue", ) @@ -265,7 +265,7 @@ def test_sqs_send_message(self): remote_resource_type="AWS::SQS::Queue", remote_resource_identifier="test_put_get_queue", request_specific_attributes={ - _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", + _AWS_SQS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", }, span_name="SQS.SendMessage", ) @@ -282,7 +282,7 @@ def test_sqs_receive_message(self): remote_resource_type="AWS::SQS::Queue", remote_resource_identifier="test_put_get_queue", request_specific_attributes={ - _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", + _AWS_SQS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", }, span_name="SQS.ReceiveMessage", ) @@ -299,7 +299,7 @@ def test_sqs_error(self): remote_resource_type="AWS::SQS::Queue", remote_resource_identifier="sqserror", request_specific_attributes={ - _AWS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror", + _AWS_SQS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror", }, span_name="SQS.SendMessage", ) @@ -316,7 +316,7 @@ def test_sqs_fault(self): remote_resource_type="AWS::SQS::Queue", remote_resource_identifier="invalid_test", request_specific_attributes={ - _AWS_QUEUE_NAME: "invalid_test", + _AWS_SQS_QUEUE_NAME: "invalid_test", }, span_name="SQS.CreateQueue", ) @@ -333,7 +333,7 @@ def test_kinesis_put_record(self): remote_resource_type="AWS::Kinesis::Stream", remote_resource_identifier="test_stream", request_specific_attributes={ - _AWS_STREAM_NAME: "test_stream", + _AWS_KINESIS_STREAM_NAME: "test_stream", }, span_name="Kinesis.PutRecord", ) @@ -350,7 +350,7 @@ def test_kinesis_error(self): remote_resource_type="AWS::Kinesis::Stream", remote_resource_identifier="invalid_stream", request_specific_attributes={ - _AWS_STREAM_NAME: "invalid_stream", + _AWS_KINESIS_STREAM_NAME: "invalid_stream", }, span_name="Kinesis.PutRecord", ) @@ -367,7 +367,7 @@ def test_kinesis_fault(self): remote_resource_type="AWS::Kinesis::Stream", remote_resource_identifier="test_stream", request_specific_attributes={ - _AWS_STREAM_NAME: "test_stream", + _AWS_KINESIS_STREAM_NAME: "test_stream", }, span_name="Kinesis.PutRecord", )