diff --git a/newrelic/hooks/external_botocore.py b/newrelic/hooks/external_botocore.py index a3e0dbf6d..109aa589d 100644 --- a/newrelic/hooks/external_botocore.py +++ b/newrelic/hooks/external_botocore.py @@ -58,7 +58,17 @@ def extract_sqs(*args, **kwargs): return queue_value.rsplit("/", 1)[-1] -def extract_sqs_agent_attrs(*args, **kwargs): +def extract_kinesis(*args, **kwargs): + # The stream name can be passed as the StreamName or as part of the StreamARN. + stream_value = kwargs.get("StreamName", "Unknown") + if stream_value == "Unknown": + arn = kwargs.get("StreamARN", None) + if arn: + stream_value = arn.split("/", 1)[-1] + return stream_value + + +def extract_sqs_agent_attrs(instance, *args, **kwargs): # Try to capture AWS SQS info as agent attributes. Log any exception to debug. agent_attrs = {} try: @@ -75,6 +85,30 @@ def extract_sqs_agent_attrs(*args, **kwargs): return agent_attrs +def extract_kinesis_agent_attrs(instance, *args, **kwargs): + # Try to capture AWS Kinesis info as agent attributes. Log any exception to debug. + agent_attrs = {} + try: + stream_arn = kwargs.get("StreamARN", None) + if stream_arn: + agent_attrs["cloud.platform"] = "aws_kinesis_data_streams" + agent_attrs["cloud.resource_id"] = stream_arn + else: + stream_name = kwargs.get("StreamName", None) + transaction = current_transaction() + settings = transaction.settings if transaction.settings else global_settings() + account_id = settings.cloud.aws.account_id if settings and settings.cloud.aws.account_id else None + region = None + if hasattr(instance, "_client_config") and hasattr(instance._client_config, "region_name"): + region = instance._client_config.region_name + if stream_name and account_id and region: + agent_attrs["cloud.platform"] = "aws_kinesis_data_streams" + agent_attrs["cloud.resource_id"] = f"arn:aws:kinesis:{region}:{account_id}:stream/{stream_name}" + except Exception as e: + _logger.debug("Failed to capture AWS Kinesis info.", exc_info=True) + return agent_attrs + + def extract(argument_names, default=None): def extractor_list(*args, **kwargs): for argument_name in argument_names: @@ -954,7 +988,50 @@ def _nr_dynamodb_datastore_trace_wrapper_(wrapped, instance, args, kwargs): return _nr_dynamodb_datastore_trace_wrapper_ -def sqs_message_trace( +def aws_function_trace( + operation, + destination_name, + params={}, + terminal=False, + async_wrapper=None, + extract_agent_attrs=None, + library=None, +): + @function_wrapper + def _nr_aws_function_trace_wrapper_(wrapped, instance, args, kwargs): + wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped) + if not wrapper: + parent = current_trace() + if not parent: + return wrapped(*args, **kwargs) + else: + parent = None + + _destination_name = destination_name(*args, **kwargs) + + trace = FunctionTrace( + name=_destination_name, + group=f"{library}/{operation}", + params=params, + terminal=terminal, + parent=parent, + source=wrapped, + ) + + # Attach extracted agent attributes. + _agent_attrs = extract_agent_attrs(instance, *args, **kwargs) + trace.agent_attributes.update(_agent_attrs) + + if wrapper: # pylint: disable=W0125,W0126 + return wrapper(wrapped, trace)(*args, **kwargs) + + with trace: + return wrapped(*args, **kwargs) + + return _nr_aws_function_trace_wrapper_ + + +def aws_message_trace( operation, destination_type, destination_name, @@ -962,9 +1039,10 @@ def sqs_message_trace( terminal=True, async_wrapper=None, extract_agent_attrs=None, + library=None, ): @function_wrapper - def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs): + def _nr_aws_message_trace_wrapper_(wrapped, instance, args, kwargs): wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped) if not wrapper: parent = current_trace() @@ -973,7 +1051,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs): else: parent = None - _library = "SQS" + _library = library _operation = operation _destination_type = destination_type _destination_name = destination_name(*args, **kwargs) @@ -990,7 +1068,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs): ) # Attach extracted agent attributes. - _agent_attrs = extract_agent_attrs(*args, **kwargs) + _agent_attrs = extract_agent_attrs(instance, *args, **kwargs) trace.agent_attributes.update(_agent_attrs) if wrapper: # pylint: disable=W0125,W0126 @@ -999,7 +1077,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs): with trace: return wrapped(*args, **kwargs) - return _nr_sqs_message_trace_wrapper_ + return _nr_aws_message_trace_wrapper_ def wrap_emit_api_params(wrapped, instance, args, kwargs): @@ -1059,14 +1137,140 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs): ("dynamodb", "delete_table"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "delete_table"), ("dynamodb", "query"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "query"), ("dynamodb", "scan"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "scan"), - ("sqs", "send_message"): sqs_message_trace( - "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs + ("kinesis", "add_tags_to_stream"): aws_function_trace( + "add_tags_to_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "can_paginate"): aws_function_trace( + "can_paginate", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "close"): aws_function_trace( + "close", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "create_stream"): aws_function_trace( + "create_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "decrease_stream_retention_period"): aws_function_trace( + "decrease_stream_retention_period", + extract_kinesis, + extract_agent_attrs=extract_kinesis_agent_attrs, + library="Kinesis", + ), + ("kinesis", "delete_resource_policy"): aws_function_trace( + "delete_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "delete_stream"): aws_function_trace( + "delete_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "deregister_stream_consumer"): aws_function_trace( + "deregister_stream_consumer", + extract_kinesis, + extract_agent_attrs=extract_kinesis_agent_attrs, + library="Kinesis", + ), + ("kinesis", "describe_limits"): aws_function_trace( + "describe_limits", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "describe_stream"): aws_function_trace( + "describe_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "describe_stream_consumer"): aws_function_trace( + "describe_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "describe_stream_summary"): aws_function_trace( + "describe_stream_summary", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "disable_enhanced_monitoring"): aws_function_trace( + "disable_enhanced_monitoring", + extract_kinesis, + extract_agent_attrs=extract_kinesis_agent_attrs, + library="Kinesis", + ), + ("kinesis", "enable_enhanced_monitoring"): aws_function_trace( + "enable_enhanced_monitoring", + extract_kinesis, + extract_agent_attrs=extract_kinesis_agent_attrs, + library="Kinesis", + ), + ("kinesis", "generate_presigned_url"): aws_function_trace( + "generate_presigned_url", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "get_paginator"): aws_function_trace( + "get_paginator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "get_resource_policy"): aws_function_trace( + "get_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "get_shard_iterator"): aws_function_trace( + "get_shard_iterator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "get_waiter"): aws_function_trace( + "get_waiter", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "increase_stream_retention_period"): aws_function_trace( + "increase_stream_retention_period", + extract_kinesis, + extract_agent_attrs=extract_kinesis_agent_attrs, + library="Kinesis", + ), + ("kinesis", "list_shards"): aws_function_trace( + "list_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "list_stream_consumers"): aws_function_trace( + "list_stream_consumers", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "list_streams"): aws_function_trace( + "list_streams", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "list_tags_for_stream"): aws_function_trace( + "list_tags_for_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "merge_shards"): aws_function_trace( + "merge_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "put_resource_policy"): aws_function_trace( + "put_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "register_stream_consumer"): aws_function_trace( + "register_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "remove_tags_from_stream"): aws_function_trace( + "remove_tags_from_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "split_shard"): aws_function_trace( + "split_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "start_stream_encryption"): aws_function_trace( + "start_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "stop_stream_encryption"): aws_function_trace( + "stop_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "subscribe_to_shard"): aws_function_trace( + "subscribe_to_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "update_shard_count"): aws_function_trace( + "update_shard_count", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "update_stream_mode"): aws_function_trace( + "update_stream_mode", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="" + ), + ("kinesis", "put_record"): aws_message_trace( + "Produce", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "put_records"): aws_message_trace( + "Produce", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("kinesis", "get_records"): aws_message_trace( + "Consume", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis" + ), + ("sqs", "send_message"): aws_message_trace( + "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS" ), - ("sqs", "send_message_batch"): sqs_message_trace( - "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs + ("sqs", "send_message_batch"): aws_message_trace( + "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS" ), - ("sqs", "receive_message"): sqs_message_trace( - "Consume", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs + ("sqs", "receive_message"): aws_message_trace( + "Consume", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS" ), ("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False), ("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model( diff --git a/tests/external_botocore/test_boto3_kinesis.py b/tests/external_botocore/test_boto3_kinesis.py new file mode 100644 index 000000000..92b916dd6 --- /dev/null +++ b/tests/external_botocore/test_boto3_kinesis.py @@ -0,0 +1,237 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +import uuid + +import boto3 +import botocore +import pytest +from moto import mock_aws +from testing_support.fixtures import dt_enabled, override_application_settings +from testing_support.validators.validate_span_events import validate_span_events +from testing_support.validators.validate_transaction_metrics import ( + validate_transaction_metrics, +) + +from newrelic.api.background_task import background_task +from newrelic.common.package_version_utils import get_package_version_tuple +from newrelic.hooks.external_botocore import CUSTOM_TRACE_POINTS + +MOTO_VERSION = get_package_version_tuple("moto") +BOTOCORE_VERSION = get_package_version_tuple("boto3") + +URL = "kinesis.us-east-1.amazonaws.com" +TEST_STREAM = f"python-agent-test-{uuid.uuid4()}" +EXPECTED_AGENT_ATTRS = { + "exact_agents": { + "cloud.platform": "aws_kinesis_data_streams", + "cloud.resource_id": f"arn:aws:kinesis:us-east-1:123456789012:stream/{TEST_STREAM}", + }, +} + +AWS_ACCESS_KEY_ID = "AAAAAAAAAAAACCESSKEY" +AWS_SECRET_ACCESS_KEY = "AAAAAASECRETKEY" # nosec +AWS_REGION = "us-east-1" + +_kinesis_scoped_metrics = [ + (f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2), + (f"MessageBroker/Kinesis/Stream/Consume/Named/{TEST_STREAM}", 1), + (f"Kinesis/create_stream/{TEST_STREAM}", 1), + (f"Kinesis/describe_stream/{TEST_STREAM}", 1), + (f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1), + (f"Kinesis/delete_stream/{TEST_STREAM}", 1), + (f"External/{URL}/botocore/POST", 2), +] +if BOTOCORE_VERSION < (1, 29, 0): + _kinesis_scoped_metrics = [ + (f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2), + (f"Kinesis/create_stream/{TEST_STREAM}", 1), + (f"Kinesis/describe_stream/{TEST_STREAM}", 1), + (f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1), + (f"Kinesis/delete_stream/{TEST_STREAM}", 1), + (f"External/{URL}/botocore/POST", 4), + ] + +_kinesis_rollup_metrics = [ + (f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2), + (f"MessageBroker/Kinesis/Stream/Consume/Named/{TEST_STREAM}", 1), + (f"Kinesis/create_stream/{TEST_STREAM}", 1), + (f"Kinesis/describe_stream/{TEST_STREAM}", 1), + (f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1), + (f"Kinesis/delete_stream/{TEST_STREAM}", 1), + ("External/all", 4), + ("External/allOther", 4), + (f"External/{URL}/all", 2), + (f"External/{URL}/botocore/POST", 2), +] +if BOTOCORE_VERSION < (1, 29, 0): + _kinesis_rollup_metrics = [ + (f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2), + (f"Kinesis/create_stream/{TEST_STREAM}", 1), + (f"Kinesis/describe_stream/{TEST_STREAM}", 1), + (f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1), + (f"Kinesis/delete_stream/{TEST_STREAM}", 1), + ("External/all", 4), + ("External/allOther", 4), + (f"External/{URL}/all", 4), + (f"External/{URL}/botocore/POST", 4), + ] + +_kinesis_scoped_metrics_error = [ + ("MessageBroker/Kinesis/Stream/Produce/Named/Unknown", 1), +] + +_kinesis_rollup_metrics_error = [ + ("MessageBroker/Kinesis/Stream/Produce/Named/Unknown", 1), +] + + +@background_task() +@mock_aws +def test_instrumented_kinesis_methods(): + client = boto3.client( + "kinesis", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + region_name=AWS_REGION, + ) + + client_methods = inspect.getmembers(client, predicate=inspect.ismethod) + methods = {("kinesis", name) for (name, method) in client_methods if not name.startswith("_")} + + uninstrumented_methods = methods - set(CUSTOM_TRACE_POINTS.keys()) + assert not uninstrumented_methods + + +@override_application_settings({"cloud.aws.account_id": 123456789012}) +@dt_enabled +@validate_span_events(exact_agents={"aws.operation": "CreateStream"}, count=1) +@validate_span_events( + **EXPECTED_AGENT_ATTRS, + count=6 if BOTOCORE_VERSION < (1, 29, 0) else 7, +) +@validate_span_events(exact_agents={"aws.operation": "DeleteStream"}, count=1) +@validate_transaction_metrics( + "test_boto3_kinesis:test_kinesis", + scoped_metrics=_kinesis_scoped_metrics, + rollup_metrics=_kinesis_rollup_metrics, + background_task=True, +) +@background_task() +@mock_aws +def test_kinesis(): + client = boto3.client( + "kinesis", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + region_name=AWS_REGION, + ) + # Create stream + resp = client.create_stream(StreamName=TEST_STREAM, ShardCount=123, StreamModeDetails={"StreamMode": "on-demand"}) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + + # Stream ARN is needed for rest of methods. + resp = client.describe_stream( + StreamName=TEST_STREAM, + Limit=123, + ) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + ARN = resp["StreamDescription"]["StreamARN"] + + # StreamARN is not supported in older versions of botocore. + stream_kwargs = {"StreamName": TEST_STREAM} if BOTOCORE_VERSION < (1, 29, 0) else {"StreamARN": ARN} + + # Send message + resp = client.put_record(Data=b"foo1", PartitionKey="bar", **stream_kwargs) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + + # Send messages + resp = client.put_records( + Records=[{"Data": b"foo2", "PartitionKey": "bar"}, {"Data": b"foo3", "PartitionKey": "bar"}], **stream_kwargs + ) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + + shard_iter = client.get_shard_iterator( + ShardId="shardId-000000000000", + ShardIteratorType="AT_SEQUENCE_NUMBER", + StartingSequenceNumber="0", + **stream_kwargs, + )["ShardIterator"] + + # Receive message + if BOTOCORE_VERSION < (1, 29, 0): + resp = client.get_records(ShardIterator=shard_iter) + else: + resp = client.get_records(ShardIterator=shard_iter, **stream_kwargs) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + + # Delete stream + resp = client.delete_stream( + EnforceConsumerDeletion=True, + **stream_kwargs, + ) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + + +@dt_enabled +@validate_transaction_metrics( + "test_boto3_kinesis:test_kinesis_error", + scoped_metrics=_kinesis_scoped_metrics_error, + rollup_metrics=_kinesis_rollup_metrics_error, + background_task=True, +) +@background_task() +@mock_aws +def test_kinesis_error(): + client = boto3.client( + "kinesis", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + region_name=AWS_REGION, + ) + # Create stream + resp = client.create_stream(StreamName=TEST_STREAM, ShardCount=123, StreamModeDetails={"StreamMode": "on-demand"}) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + + # Stream ARN is needed for rest of methods. + resp = client.describe_stream( + StreamName=TEST_STREAM, + Limit=123, + ) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + ARN = resp["StreamDescription"]["StreamARN"] + + # StreamARN is not supported in older versions of botocore. + stream_kwargs = {"StreamName": TEST_STREAM} if BOTOCORE_VERSION < (1, 29, 0) else {"StreamARN": ARN} + + expected_error = ( + botocore.exceptions.ParamValidationError + if BOTOCORE_VERSION < (1, 29, 0) + else client.exceptions.ResourceNotFoundException + ) + + # Malformed send message, uses arg instead of kwarg + with pytest.raises(expected_error): + resp = client.put_record( + Data=b"{foo}", + PartitionKey="bar", + ) + + # Delete stream + resp = client.delete_stream( + EnforceConsumerDeletion=True, + **stream_kwargs, + ) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200