From 68c4dbcb05aa7fc312ff180130d4c1e7837a7dc3 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Tue, 28 May 2024 16:22:03 -0700 Subject: [PATCH] Add SNS support. --- .../distro/_aws_attribute_keys.py | 1 + .../distro/_aws_metric_attribute_generator.py | 5 + .../distro/patches/_botocore_patches.py | 24 +- .../distro/patches/test_botocore_sns.py | 36 +++ .../test_aws_metric_attribute_generator.py | 7 + .../distro/test_instrumentation_patch.py | 28 ++- .../applications/botocore/botocore_server.py | 49 +++- .../test/amazon/botocore/botocore_test.py | 233 ++++-------------- 8 files changed, 188 insertions(+), 195 deletions(-) create mode 100644 aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_botocore_sns.py diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py index f6498ac76..5e8c8f422 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py @@ -16,3 +16,4 @@ AWS_QUEUE_URL: str = "aws.sqs.queue_url" AWS_QUEUE_NAME: str = "aws.sqs.queue_name" AWS_STREAM_NAME: str = "aws.kinesis.stream_name" +AWS_TOPIC_ARN: str = "aws.sns.topic_arn" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py index 577d28f63..5493b0263 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py @@ -16,6 +16,7 @@ AWS_REMOTE_SERVICE, AWS_SPAN_KIND, AWS_STREAM_NAME, + AWS_TOPIC_ARN, ) from amazon.opentelemetry.distro._aws_span_processing_util import ( LOCAL_ROOT, @@ -78,6 +79,7 @@ _NORMALIZED_KINESIS_SERVICE_NAME: str = "AWS::Kinesis" _NORMALIZED_S3_SERVICE_NAME: str = "AWS::S3" _NORMALIZED_SQS_SERVICE_NAME: str = "AWS::SQS" +_NORMALIZED_SNS_SERVICE_NAME: str = "AWS::SNS" _DB_CONNECTION_STRING_TYPE: str = "DB::Connection" # Special DEPENDENCY attribute value if GRAPHQL_OPERATION_TYPE attribute key is present. @@ -372,6 +374,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri remote_resource_identifier = _escape_delimiters( SqsUrlParser.get_queue_name(span.attributes.get(AWS_QUEUE_URL)) ) + elif is_key_present(span, AWS_TOPIC_ARN): + remote_resource_type = _NORMALIZED_SNS_SERVICE_NAME + "::TopicArn" + remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_TOPIC_ARN)) elif is_db_span(span): remote_resource_type = _DB_CONNECTION_STRING_TYPE remote_resource_identifier = _get_db_connection(span) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index cf73fb345..1d11abab5 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -4,6 +4,7 @@ import importlib from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS +from opentelemetry.instrumentation.botocore.extensions.sns import _SnsExtension from opentelemetry.instrumentation.botocore.extensions.sqs import _SqsExtension from opentelemetry.instrumentation.botocore.extensions.types import _AttributeMapT, _AwsSdkExtension from opentelemetry.semconv.trace import SpanAttributes @@ -12,11 +13,12 @@ def _apply_botocore_instrumentation_patches() -> None: """Botocore instrumentation patches - Adds patches to provide additional support and Java parity for Kinesis, S3, and SQS. + Adds patches to provide additional support for Kinesis, S3, SQS and SNS. """ _apply_botocore_kinesis_patch() _apply_botocore_s3_patch() _apply_botocore_sqs_patch() + _apply_botocore_sns_patch() def _apply_botocore_kinesis_patch() -> None: @@ -65,6 +67,26 @@ def patch_extract_attributes(self, attributes: _AttributeMapT): _SqsExtension.extract_attributes = patch_extract_attributes +def _apply_botocore_sns_patch() -> None: + """Botocore instrumentation patch for SNS + + This patch extends the existing upstream extension for SNS. Extensions allow for custom logic for adding + service-specific information to spans, such as attributes. Specifically, we are adding logic to add + "aws.sns.topic_arn" attributes to be used to generate AWS_REMOTE_RESOURCE_TYPE and AWS_REMOTE_RESOURCE_IDENTIFIER. + Callout that today, the upstream logic adds SpanAttributes.MESSAGING_DESTINATION_NAME, + but we are not using it as it can only be assigned with TargetArn as well. + """ + old_extract_attributes = _SnsExtension.extract_attributes + + def patch_extract_attributes(self, attributes: _AttributeMapT): + old_extract_attributes(self, attributes) + topic_arn = self._call_context.params.get("TopicArn") + if topic_arn: + attributes["aws.sns.topic_arn"] = topic_arn + + _SnsExtension.extract_attributes = patch_extract_attributes + + # The OpenTelemetry Authors code def _lazy_load(module, cls): """Clone of upstream opentelemetry.instrumentation.botocore.extensions.lazy_load diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_botocore_sns.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_botocore_sns.py new file mode 100644 index 000000000..bdaf12392 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_botocore_sns.py @@ -0,0 +1,36 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import botocore.session +from moto import mock_aws + +from amazon.opentelemetry.distro.patches._instrumentation_patch import apply_instrumentation_patches +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.test.test_base import TestBase + + +class TestSnsExtension(TestBase): + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + # Apply patches + apply_instrumentation_patches() + + session = botocore.session.get_session() + session.set_credentials(access_key="access-key", secret_key="secret-key") + self.client = session.create_client("sns", region_name="us-west-2") + self.topic_name = "my-topic" + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + @mock_aws + def test_create_and_delete_topic(self): + self.memory_exporter.clear() + response = self.client.create_topic(Name=self.topic_name) + topic_arn = response["TopicArn"] + self.client.delete_topic(TopicArn=topic_arn) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(2, len(spans)) + span = spans[1] + self.assertEqual(topic_arn, span.attributes["aws.sns.topic_arn"]) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py index 072e6eeb0..49be9c9cd 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py @@ -19,6 +19,7 @@ AWS_REMOTE_SERVICE, AWS_SPAN_KIND, AWS_STREAM_NAME, + AWS_TOPIC_ARN, ) from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator from amazon.opentelemetry.distro.metric_attribute_generator import DEPENDENCY_METRIC, SERVICE_METRIC @@ -821,6 +822,7 @@ def test_normalize_remote_service_name_aws_sdk(self): self.validate_aws_sdk_service_normalization("Kinesis", "AWS::Kinesis") self.validate_aws_sdk_service_normalization("S3", "AWS::S3") self.validate_aws_sdk_service_normalization("SQS", "AWS::SQS") + self.validate_aws_sdk_service_normalization("SNS", "AWS::SNS") def validate_aws_sdk_service_normalization(self, service_name: str, expected_remote_service: str): self._mock_attribute([SpanAttributes.RPC_SYSTEM, SpanAttributes.RPC_SERVICE], ["aws-api", service_name]) @@ -977,6 +979,11 @@ def test_sdk_client_span_with_remote_resource_attributes(self): self._validate_remote_resource_attributes("AWS::DynamoDB::Table", "aws_table^^name") self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [None]) + # Validate behaviour of AWS_TOPIC_ARN attribute, then remove it + self._mock_attribute([AWS_TOPIC_ARN], ["arn:aws:sns:us-west-2:012345678901:test_topic"], keys, values) + self._validate_remote_resource_attributes("AWS::SNS::TopicArn", "arn:aws:sns:us-west-2:012345678901:test_topic") + self._mock_attribute([AWS_TOPIC_ARN], [None]) + self._mock_attribute([SpanAttributes.RPC_SYSTEM], [None]) def test_client_db_span_with_remote_resource_attributes(self): diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index bc6e851a9..d7172874c 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -14,6 +14,7 @@ _BUCKET_NAME: str = "bucketName" _QUEUE_NAME: str = "queueName" _QUEUE_URL: str = "queueUrl" +_TOPIC_ARN: str = "topicArn" class TestInstrumentationPatch(TestCase): @@ -69,10 +70,17 @@ def _validate_unpatched_botocore_instrumentation(self): # SQS self.assertTrue("sqs" in _KNOWN_EXTENSIONS, "Upstream has removed the SQS extension") - attributes: Dict[str, str] = _do_extract_sqs_attributes() - self.assertTrue("aws.queue_url" in attributes) - self.assertFalse("aws.sqs.queue_url" in attributes) - self.assertFalse("aws.sqs.queue_name" in attributes) + sqs_attributes: Dict[str, str] = _do_extract_sqs_attributes() + self.assertTrue("aws.queue_url" in sqs_attributes) + self.assertFalse("aws.sqs.queue_url" in sqs_attributes) + self.assertFalse("aws.sqs.queue_name" in sqs_attributes) + + # SNS + self.assertTrue("sns" in _KNOWN_EXTENSIONS, "Upstream has removed the SNS extension") + sns_attributes: Dict[str, str] = _do_extract_sns_attributes() + self.assertTrue(SpanAttributes.MESSAGING_SYSTEM in sns_attributes) + self.assertEqual(sns_attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sns") + self.assertFalse("aws.sns.topic_arn" in sns_attributes) def _validate_patched_botocore_instrumentation(self): # Kinesis @@ -96,6 +104,12 @@ def _validate_patched_botocore_instrumentation(self): self.assertTrue("aws.sqs.queue_name" in sqs_attributes) self.assertEqual(sqs_attributes["aws.sqs.queue_name"], _QUEUE_NAME) + # SNS + self.assertTrue("sns" in _KNOWN_EXTENSIONS) + sns_attributes: Dict[str, str] = _do_extract_sns_attributes() + self.assertTrue("aws.sns.topic_arn" in sns_attributes) + self.assertEqual(sns_attributes["aws.sns.topic_arn"], _TOPIC_ARN) + def _do_extract_kinesis_attributes() -> Dict[str, str]: service_name: str = "kinesis" @@ -115,6 +129,12 @@ def _do_extract_sqs_attributes() -> Dict[str, str]: return _do_extract_attributes(service_name, params) +def _do_extract_sns_attributes() -> Dict[str, str]: + service_name: str = "sns" + params: Dict[str, str] = {"TopicArn": _TOPIC_ARN} + return _do_extract_attributes(service_name, params) + + def _do_extract_attributes(service_name: str, params: Dict[str, str]) -> Dict[str, str]: mock_call_context: MagicMock = MagicMock() mock_call_context.params = params diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index 192093d1f..bff95ca6b 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -41,12 +41,14 @@ def do_GET(self): self._handle_sqs_request() if self.in_path("kinesis"): self._handle_kinesis_request() + if self.in_path("sns"): + self._handle_sns_request() self._end_request(self.main_status) # pylint: disable=invalid-name def do_POST(self): - if self.in_path("sqserror"): + if self.in_path("sqserror") or self.in_path("snserror"): self.send_response(self.main_status) self.send_header("Content-type", "text/xml") self.end_headers() @@ -203,6 +205,47 @@ def _handle_kinesis_request(self) -> None: else: set_main_status(404) + def _handle_sns_request(self) -> None: + sns_client: BaseClient = boto3.client("sns", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) + if self.in_path(_ERROR): + set_main_status(400) + try: + error_client: BaseClient = boto3.client( + "sns", endpoint_url=_ERROR_ENDPOINT + "/snserror", region_name=_AWS_REGION + ) + topic_arn = "arn:aws:sns:us-west-2:000000000000:test_topic/snserror" + message = "Hello from Amazon SNS!" + subject = "Test Message" + message_attributes = {"Attribute1": {"DataType": "String", "StringValue": "Value1"}} + error_client.publish( + TopicArn=topic_arn, Message=message, Subject=subject, MessageAttributes=message_attributes + ) + except Exception as exception: + print("Expected exception occurred", exception) + elif self.in_path(_FAULT): + set_main_status(500) + try: + fault_client: BaseClient = boto3.client( + "sns", endpoint_url=_FAULT_ENDPOINT, region_name=_AWS_REGION, config=_NO_RETRY_CONFIG + ) + fault_client.get_topic_attributes(TopicArn="invalid_topic_arn") + except Exception as exception: + print("Expected exception occurred", exception) + elif self.in_path("gettopattributes/get-topic-attributes"): + set_main_status(200) + sns_client.get_topic_attributes(TopicArn="arn:aws:sns:us-west-2:000000000000:test_topic") + elif self.in_path("publishmessage/publish-message/some-message"): + set_main_status(200) + topic_arn = "arn:aws:sns:us-west-2:000000000000:test_topic" + message = "Hello from Amazon SNS!" + subject = "Test Message" + message_attributes = {"Attribute1": {"DataType": "String", "StringValue": "Value1"}} + sns_client.publish( + TopicArn=topic_arn, Message=message, Subject=subject, MessageAttributes=message_attributes + ) + else: + set_main_status(404) + def _end_request(self, status_code: int): self.send_response_only(status_code) self.end_headers() @@ -247,6 +290,10 @@ def prepare_aws_server() -> None: # Set up Kinesis so tests can access a stream. kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) kinesis_client.create_stream(StreamName="test_stream", ShardCount=1) + + # Set up SNS so tests can access a topic. + sns_client: BaseClient = boto3.client("sns", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) + sns_client.create_topic(Name="test_topic") except Exception as exception: print("Unexpected exception occurred", exception) diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index f309f343b..96a08b700 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -29,6 +29,7 @@ _AWS_QUEUE_URL: str = "aws.sqs.queue_url" _AWS_QUEUE_NAME: str = "aws.sqs.queue_name" _AWS_STREAM_NAME: str = "aws.kinesis.stream_name" +_AWS_TOPIC_ARN: str = "aws.sns.topic_arn" # pylint: disable=too-many-public-methods @@ -97,6 +98,7 @@ def test_s3_create_bucket(self): SpanAttributes.AWS_S3_BUCKET: "test-bucket-name", }, span_name="S3.CreateBucket", + span_kind="CLIENT", ) def test_s3_create_object(self): @@ -114,6 +116,7 @@ def test_s3_create_object(self): SpanAttributes.AWS_S3_BUCKET: "test-put-object-bucket-name", }, span_name="S3.PutObject", + span_kind="CLIENT", ) def test_s3_get_object(self): @@ -131,6 +134,7 @@ def test_s3_get_object(self): SpanAttributes.AWS_S3_BUCKET: "test-get-object-bucket-name", }, span_name="S3.GetObject", + span_kind="CLIENT", ) def test_s3_error(self): @@ -148,227 +152,79 @@ def test_s3_error(self): SpanAttributes.AWS_S3_BUCKET: "-", }, span_name="S3.CreateBucket", + span_kind="CLIENT", ) - def test_s3_fault(self): + def test_sns_get_topic_attributes(self): self.do_test_requests( - "s3/fault", - "GET", - 500, - 0, - 1, - remote_service="AWS::S3", - remote_operation="CreateBucket", - remote_resource_type="AWS::S3::Bucket", - remote_resource_identifier="valid-bucket-name", - request_specific_attributes={ - SpanAttributes.AWS_S3_BUCKET: "valid-bucket-name", - }, - span_name="S3.CreateBucket", - ) - - def test_dynamodb_create_table(self): - self.do_test_requests( - "ddb/createtable/some-table", - "GET", - 200, - 0, - 0, - remote_service="AWS::DynamoDB", - remote_operation="CreateTable", - remote_resource_type="AWS::DynamoDB::Table", - remote_resource_identifier="test_table", - request_specific_attributes={ - SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["test_table"], - }, - span_name="DynamoDB.CreateTable", - ) - - def test_dynamodb_put_item(self): - self.do_test_requests( - "ddb/putitem/putitem-table/key", - "GET", - 200, - 0, - 0, - remote_service="AWS::DynamoDB", - remote_operation="PutItem", - remote_resource_type="AWS::DynamoDB::Table", - remote_resource_identifier="put_test_table", - request_specific_attributes={ - SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["put_test_table"], - }, - span_name="DynamoDB.PutItem", - ) - - def test_dynamodb_error(self): - self.do_test_requests( - "ddb/error", - "GET", - 400, - 1, - 0, - remote_service="AWS::DynamoDB", - remote_operation="PutItem", - remote_resource_type="AWS::DynamoDB::Table", - remote_resource_identifier="invalid_table", - request_specific_attributes={ - SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], - }, - span_name="DynamoDB.PutItem", - ) - - def test_dynamodb_fault(self): - self.do_test_requests( - "ddb/fault", - "GET", - 500, - 0, - 1, - remote_service="AWS::DynamoDB", - remote_operation="PutItem", - remote_resource_type="AWS::DynamoDB::Table", - remote_resource_identifier="invalid_table", - request_specific_attributes={ - SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], - }, - span_name="DynamoDB.PutItem", - ) - - def test_sqs_create_queue(self): - self.do_test_requests( - "sqs/createqueue/some-queue", - "GET", - 200, - 0, - 0, - remote_service="AWS::SQS", - remote_operation="CreateQueue", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="test_queue", - request_specific_attributes={ - _AWS_QUEUE_NAME: "test_queue", - }, - span_name="SQS.CreateQueue", - ) - - def test_sqs_send_message(self): - self.do_test_requests( - "sqs/publishqueue/some-queue", + "sns/gettopattributes/get-topic-attributes", "GET", 200, 0, 0, - remote_service="AWS::SQS", - remote_operation="SendMessage", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="test_put_get_queue", + remote_service="AWS::SNS", + remote_operation="GetTopicAttributes", + remote_resource_type="AWS::SNS::TopicArn", + remote_resource_identifier="arn:aws:sns:us-west-2:000000000000:test_topic", request_specific_attributes={ - _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", + _AWS_TOPIC_ARN: "arn:aws:sns:us-west-2:000000000000:test_topic", }, - span_name="SQS.SendMessage", + span_name="SNS.GetTopicAttributes", + span_kind="CLIENT", ) - def test_sqs_receive_message(self): + def test_sns_publish_message(self): self.do_test_requests( - "sqs/consumequeue/some-queue", + "sns/publishmessage/publish-message/some-message", "GET", 200, 0, 0, - remote_service="AWS::SQS", - remote_operation="ReceiveMessage", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="test_put_get_queue", + remote_service="AWS::SNS", + remote_operation="Publish", + remote_resource_type="AWS::SNS::TopicArn", + remote_resource_identifier="arn:aws:sns:us-west-2:000000000000:test_topic", request_specific_attributes={ - _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", + _AWS_TOPIC_ARN: "arn:aws:sns:us-west-2:000000000000:test_topic", }, - span_name="SQS.ReceiveMessage", + span_name="test_topic send", + span_kind="PRODUCER", ) - def test_sqs_error(self): + def test_sns_error(self): self.do_test_requests( - "sqs/error", + "sns/error", "GET", 400, 1, 0, - remote_service="AWS::SQS", - remote_operation="SendMessage", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="sqserror", + remote_service="AWS::SNS", + remote_operation="Publish", + remote_resource_type="AWS::SNS::TopicArn", + remote_resource_identifier="arn:aws:sns:us-west-2:000000000000:test_topic/snserror", request_specific_attributes={ - _AWS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror", + _AWS_TOPIC_ARN: "arn:aws:sns:us-west-2:000000000000:test_topic/snserror", }, - span_name="SQS.SendMessage", + span_name="test_topic/snserror send", + span_kind="PRODUCER", ) - def test_sqs_fault(self): + def test_sns_fault(self): self.do_test_requests( - "sqs/fault", + "sns/fault", "GET", 500, 0, 1, - remote_service="AWS::SQS", - remote_operation="CreateQueue", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="invalid_test", + remote_service="AWS::SNS", + remote_operation="GetTopicAttributes", + remote_resource_type="AWS::SNS::TopicArn", + remote_resource_identifier="invalid_topic_arn", request_specific_attributes={ - _AWS_QUEUE_NAME: "invalid_test", + _AWS_TOPIC_ARN: "invalid_topic_arn", }, - span_name="SQS.CreateQueue", - ) - - def test_kinesis_put_record(self): - self.do_test_requests( - "kinesis/putrecord/my-stream", - "GET", - 200, - 0, - 0, - remote_service="AWS::Kinesis", - remote_operation="PutRecord", - remote_resource_type="AWS::Kinesis::Stream", - remote_resource_identifier="test_stream", - request_specific_attributes={ - _AWS_STREAM_NAME: "test_stream", - }, - span_name="Kinesis.PutRecord", - ) - - def test_kinesis_error(self): - self.do_test_requests( - "kinesis/error", - "GET", - 400, - 1, - 0, - remote_service="AWS::Kinesis", - remote_operation="PutRecord", - remote_resource_type="AWS::Kinesis::Stream", - remote_resource_identifier="invalid_stream", - request_specific_attributes={ - _AWS_STREAM_NAME: "invalid_stream", - }, - span_name="Kinesis.PutRecord", - ) - - def test_kinesis_fault(self): - self.do_test_requests( - "kinesis/fault", - "GET", - 500, - 0, - 1, - remote_service="AWS::Kinesis", - remote_operation="PutRecord", - remote_resource_type="AWS::Kinesis::Stream", - remote_resource_identifier="test_stream", - request_specific_attributes={ - _AWS_STREAM_NAME: "test_stream", - }, - span_name="Kinesis.PutRecord", + span_name="SNS.GetTopicAttributes", + span_kind="CLIENT", ) @override @@ -376,9 +232,8 @@ def _assert_aws_span_attributes(self, resource_scope_spans: List[ResourceScopeSp target_spans: List[Span] = [] for resource_scope_span in resource_scope_spans: # pylint: disable=no-member - if resource_scope_span.span.kind == Span.SPAN_KIND_CLIENT: + if resource_scope_span.span.kind in (Span.SPAN_KIND_CLIENT, Span.SPAN_KIND_PRODUCER): target_spans.append(resource_scope_span.span) - self.assertEqual(len(target_spans), 1) self._assert_aws_attributes( target_spans[0].attributes, @@ -419,7 +274,7 @@ def _assert_semantic_conventions_span_attributes( target_spans: List[Span] = [] for resource_scope_span in resource_scope_spans: # pylint: disable=no-member - if resource_scope_span.span.kind == Span.SPAN_KIND_CLIENT: + if resource_scope_span.span.kind in (Span.SPAN_KIND_CLIENT, Span.SPAN_KIND_PRODUCER): target_spans.append(resource_scope_span.span) self.assertEqual(len(target_spans), 1) @@ -485,7 +340,7 @@ def _assert_metric_attributes( self._assert_str_attribute(attribute_dict, AWS_LOCAL_OPERATION, "InternalOperation") self._assert_str_attribute(attribute_dict, AWS_REMOTE_SERVICE, kwargs.get("remote_service")) self._assert_str_attribute(attribute_dict, AWS_REMOTE_OPERATION, kwargs.get("remote_operation")) - self._assert_str_attribute(attribute_dict, AWS_SPAN_KIND, "CLIENT") + self._assert_str_attribute(attribute_dict, AWS_SPAN_KIND, kwargs.get("span_kind")) remote_resource_type = kwargs.get("remote_resource_type", "None") remote_resource_identifier = kwargs.get("remote_resource_identifier", "None") if remote_resource_type != "None":