Skip to content

Commit

Permalink
Add support for kinesis
Browse files Browse the repository at this point in the history
  • Loading branch information
hmstepanek committed Jan 31, 2025
1 parent 1e87217 commit 1be1817
Show file tree
Hide file tree
Showing 2 changed files with 453 additions and 12 deletions.
228 changes: 216 additions & 12 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ def extract_sqs(*args, **kwargs):
return queue_value.rsplit("/", 1)[-1]


def extract_sqs_agent_attrs(*args, **kwargs):
def extract_kinesis(*args, **kwargs):
# The stream name can be passed as the StreamName or as part of the StreamARN.
stream_value = kwargs.get("StreamName", "Unknown")
if stream_value == "Unknown":
arn = kwargs.get("StreamARN", None)
if arn:
stream_value = arn.split("/", 1)[-1]
return stream_value


def extract_sqs_agent_attrs(instance, *args, **kwargs):
# Try to capture AWS SQS info as agent attributes. Log any exception to debug.
agent_attrs = {}
try:
Expand All @@ -75,6 +85,30 @@ def extract_sqs_agent_attrs(*args, **kwargs):
return agent_attrs


def extract_kinesis_agent_attrs(instance, *args, **kwargs):
# Try to capture AWS Kinesis info as agent attributes. Log any exception to debug.
agent_attrs = {}
try:
stream_arn = kwargs.get("StreamARN", None)
if stream_arn:
agent_attrs["cloud.platform"] = "aws_kinesis_data_streams"
agent_attrs["cloud.resource_id"] = stream_arn
else:
stream_name = kwargs.get("StreamName", None)
transaction = current_transaction()
settings = transaction.settings if transaction.settings else global_settings()
account_id = settings.cloud.aws.account_id if settings and settings.cloud.aws.account_id else None
region = None
if hasattr(instance, "_client_config") and hasattr(instance._client_config, "region_name"):
region = instance._client_config.region_name
if stream_name and account_id and region:
agent_attrs["cloud.platform"] = "aws_kinesis_data_streams"
agent_attrs["cloud.resource_id"] = f"arn:aws:kinesis:{region}:{account_id}:stream/{stream_name}"
except Exception as e:
_logger.debug("Failed to capture AWS Kinesis info.", exc_info=True)
return agent_attrs


def extract(argument_names, default=None):
def extractor_list(*args, **kwargs):
for argument_name in argument_names:
Expand Down Expand Up @@ -954,17 +988,61 @@ def _nr_dynamodb_datastore_trace_wrapper_(wrapped, instance, args, kwargs):
return _nr_dynamodb_datastore_trace_wrapper_


def sqs_message_trace(
def aws_function_trace(
operation,
destination_name,
params={},
terminal=False,
async_wrapper=None,
extract_agent_attrs=None,
library=None,
):
@function_wrapper
def _nr_aws_function_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
parent = current_trace()
if not parent:
return wrapped(*args, **kwargs)
else:
parent = None

_destination_name = destination_name(*args, **kwargs)

trace = FunctionTrace(
name=_destination_name,
group=f"{library}/{operation}",
params=params,
terminal=terminal,
parent=parent,
source=wrapped,
)

# Attach extracted agent attributes.
_agent_attrs = extract_agent_attrs(instance, *args, **kwargs)
trace.agent_attributes.update(_agent_attrs)

if wrapper: # pylint: disable=W0125,W0126
return wrapper(wrapped, trace)(*args, **kwargs)

with trace:
return wrapped(*args, **kwargs)

return _nr_aws_function_trace_wrapper_


def aws_message_trace(
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
library=None,
):
@function_wrapper
def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
def _nr_aws_message_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
parent = current_trace()
Expand All @@ -973,7 +1051,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
else:
parent = None

_library = "SQS"
_library = library
_operation = operation
_destination_type = destination_type
_destination_name = destination_name(*args, **kwargs)
Expand All @@ -990,7 +1068,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
)

# Attach extracted agent attributes.
_agent_attrs = extract_agent_attrs(*args, **kwargs)
_agent_attrs = extract_agent_attrs(instance, *args, **kwargs)
trace.agent_attributes.update(_agent_attrs)

if wrapper: # pylint: disable=W0125,W0126
Expand All @@ -999,7 +1077,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
with trace:
return wrapped(*args, **kwargs)

return _nr_sqs_message_trace_wrapper_
return _nr_aws_message_trace_wrapper_


def wrap_emit_api_params(wrapped, instance, args, kwargs):
Expand Down Expand Up @@ -1059,14 +1137,140 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("dynamodb", "delete_table"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "delete_table"),
("dynamodb", "query"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "query"),
("dynamodb", "scan"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "scan"),
("sqs", "send_message"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
("kinesis", "add_tags_to_stream"): aws_function_trace(
"add_tags_to_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "can_paginate"): aws_function_trace(
"can_paginate", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "close"): aws_function_trace(
"close", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "create_stream"): aws_function_trace(
"create_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "decrease_stream_retention_period"): aws_function_trace(
"decrease_stream_retention_period",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "delete_resource_policy"): aws_function_trace(
"delete_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "delete_stream"): aws_function_trace(
"delete_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "deregister_stream_consumer"): aws_function_trace(
"deregister_stream_consumer",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "describe_limits"): aws_function_trace(
"describe_limits", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "describe_stream"): aws_function_trace(
"describe_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "describe_stream_consumer"): aws_function_trace(
"describe_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "describe_stream_summary"): aws_function_trace(
"describe_stream_summary", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "disable_enhanced_monitoring"): aws_function_trace(
"disable_enhanced_monitoring",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "enable_enhanced_monitoring"): aws_function_trace(
"enable_enhanced_monitoring",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "generate_presigned_url"): aws_function_trace(
"generate_presigned_url", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_paginator"): aws_function_trace(
"get_paginator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_resource_policy"): aws_function_trace(
"get_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_shard_iterator"): aws_function_trace(
"get_shard_iterator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_waiter"): aws_function_trace(
"get_waiter", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "increase_stream_retention_period"): aws_function_trace(
"increase_stream_retention_period",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "list_shards"): aws_function_trace(
"list_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_stream_consumers"): aws_function_trace(
"list_stream_consumers", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_streams"): aws_function_trace(
"list_streams", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_tags_for_stream"): aws_function_trace(
"list_tags_for_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "merge_shards"): aws_function_trace(
"merge_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_resource_policy"): aws_function_trace(
"put_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "register_stream_consumer"): aws_function_trace(
"register_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "remove_tags_from_stream"): aws_function_trace(
"remove_tags_from_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "split_shard"): aws_function_trace(
"split_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "start_stream_encryption"): aws_function_trace(
"start_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "stop_stream_encryption"): aws_function_trace(
"stop_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "subscribe_to_shard"): aws_function_trace(
"subscribe_to_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "update_shard_count"): aws_function_trace(
"update_shard_count", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "update_stream_mode"): aws_function_trace(
"update_stream_mode", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library=""
),
("kinesis", "put_record"): aws_message_trace(
"Produce", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_records"): aws_message_trace(
"Produce", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_records"): aws_message_trace(
"Consume", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("sqs", "send_message"): aws_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
),
("sqs", "send_message_batch"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
("sqs", "send_message_batch"): aws_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
),
("sqs", "receive_message"): sqs_message_trace(
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
("sqs", "receive_message"): aws_message_trace(
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
),
("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False),
("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model(
Expand Down
Loading

0 comments on commit 1be1817

Please sign in to comment.