Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWSX][Logs forwarder] Fix a bug handling lambda logs from Kinesis event types #891

Merged
merged 6 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aws/logs_monitoring/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ def __init__(self, name, pattern, placeholder):
# Option to redact all pattern that looks like an ip address / email address / custom pattern
SCRUBBING_RULE_CONFIGS = [
ScrubbingRuleConfig(
"REDACT_IP", "\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", "xxx.xxx.xxx.xxx"
"REDACT_IP", r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", "xxx.xxx.xxx.xxx"
),
ScrubbingRuleConfig(
"REDACT_EMAIL",
"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
"[email protected]",
),
ScrubbingRuleConfig(
Expand Down
52 changes: 50 additions & 2 deletions aws/logs_monitoring/steps/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,23 @@
AwsCwEventSourcePrefix,
AwsS3EventSourceKeyword,
)
from settings import DD_CUSTOM_TAGS, DD_SERVICE, DD_SOURCE
from settings import (
AWS_STRING,
DD_CUSTOM_TAGS,
DD_FORWARDER_VERSION,
DD_SERVICE,
DD_SOURCE,
DD_TAGS,
FUNCTIONVERSION_STRING,
FORWARDERNAME_STRING,
FORWARDERMEMSIZE_STRING,
FORWARDERVERSION_STRING,
INVOKEDFUNCTIONARN_STRING,
SOURCECATEGORY_STRING,
)

CLOUDTRAIL_REGEX = re.compile(
"\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+).json.gz$",
r"\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+).json.gz$",
re.I,
)

Expand Down Expand Up @@ -101,3 +114,38 @@ def merge_dicts(a, b, path=None):
else:
a[key] = b[key]
return a


def generate_metadata(context):
metadata = {
SOURCECATEGORY_STRING: AWS_STRING,
AWS_STRING: {
FUNCTIONVERSION_STRING: context.function_version,
INVOKEDFUNCTIONARN_STRING: context.invoked_function_arn,
},
}
# Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications
dd_custom_tags_data = generate_custom_tags(context)
metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
[
DD_TAGS,
",".join(
["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()]
),
],
)
)

return metadata


def generate_custom_tags(context):
dd_custom_tags_data = {
FORWARDERNAME_STRING: context.function_name.lower(),
FORWARDERMEMSIZE_STRING: context.memory_limit_in_mb,
FORWARDERVERSION_STRING: DD_FORWARDER_VERSION,
}

return dd_custom_tags_data
26 changes: 18 additions & 8 deletions aws/logs_monitoring/steps/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,16 @@ def add_metadata_to_lambda_log(event, cache_layer):
if not lambda_log_arn:
return

# Function name is the seventh piece of the ARN
try:
function_name = lambda_log_arn.split(":")[6]
except IndexError:
logger.error(f"Failed to extract function name from ARN: {lambda_log_arn}")
return

# Set Lambda ARN to "host"
event[DD_HOST] = lambda_log_arn

# Function name is the seventh piece of the ARN
function_name = lambda_log_arn.split(":")[6]
tags = [f"functionname:{function_name}"]

# Get custom tags of the Lambda function
Expand All @@ -67,7 +72,7 @@ def add_metadata_to_lambda_log(event, cache_layer):
# If not set during parsing or has a default value
# then set the service tag from lambda tags cache or using the function name
# otherwise, remove the service tag from the custom lambda tags if exists to avoid duplication
if not event[DD_SERVICE] or event[DD_SERVICE] == event[DD_SOURCE]:
if not event.get(DD_SERVICE) or event.get(DD_SERVICE) == event.get(DD_SOURCE):
service_tag = next(
(tag for tag in custom_lambda_tags if tag.startswith("service:")),
f"service:{function_name}",
Expand All @@ -85,16 +90,21 @@ def add_metadata_to_lambda_log(event, cache_layer):
custom_env_tag = next(
(tag for tag in custom_lambda_tags if tag.startswith("env:")), None
)
if custom_env_tag is not None:
event[DD_CUSTOM_TAGS] = event[DD_CUSTOM_TAGS].replace("env:none", "")
if custom_env_tag:
event[DD_CUSTOM_TAGS] = ",".join(
[t for t in event.get(DD_CUSTOM_TAGS, "").split(",") if t != "env:none"]
)

tags += custom_lambda_tags
tags.extend(custom_lambda_tags)

# Dedup tags, so we don't end up with functionname twice
tags = list(set(tags))
tags.sort() # Keep order deterministic

event[DD_CUSTOM_TAGS] = ",".join([event[DD_CUSTOM_TAGS]] + tags)
if custom_tags := event.get(DD_CUSTOM_TAGS):
event[DD_CUSTOM_TAGS] = ",".join([custom_tags] + tags)
else:
event[DD_CUSTOM_TAGS] = ",".join(tags)


def get_enriched_lambda_log_tags(log_event, cache_layer):
Expand Down Expand Up @@ -164,7 +174,7 @@ def extract_ddtags_from_message(event):
[
tag
for tag in event[DD_CUSTOM_TAGS].split(",")
if not tag.startswith("service")
if not tag.startswith("service:")
]
)

Expand Down
8 changes: 6 additions & 2 deletions aws/logs_monitoring/steps/handlers/awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from steps.common import (
add_service_tag,
generate_metadata,
merge_dicts,
parse_event_source,
)
Expand All @@ -33,7 +34,9 @@ def __init__(self, context, cache_layer):
self.context = context
self.cache_layer = cache_layer

def handle(self, event, metadata):
def handle(self, event):
# Generate metadata
metadata = generate_metadata(self.context)
# Get logs
logs = self.extract_logs(event)
# Build aws attributes
Expand Down Expand Up @@ -65,7 +68,8 @@ def handle(self, event, metadata):
self.process_eks_logs(metadata, aws_attributes)
# Create and send structured logs to Datadog
for log in logs["logEvents"]:
yield merge_dicts(log, aws_attributes.to_dict())
merged = merge_dicts(log, aws_attributes.to_dict())
yield merge_dicts(merged, metadata)

@staticmethod
def extract_logs(event):
Expand Down
80 changes: 21 additions & 59 deletions aws/logs_monitoring/steps/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,14 @@
from steps.handlers.awslogs_handler import AwsLogsHandler
from steps.handlers.s3_handler import S3EventHandler
from steps.common import (
merge_dicts,
generate_metadata,
get_service_from_tags_and_remove_duplicates,
merge_dicts,
)
from steps.enums import AwsEventType, AwsEventTypeKeyword, AwsEventSource
from settings import (
AWS_STRING,
FUNCTIONVERSION_STRING,
INVOKEDFUNCTIONARN_STRING,
SOURCECATEGORY_STRING,
FORWARDERNAME_STRING,
FORWARDERMEMSIZE_STRING,
FORWARDERVERSION_STRING,
DD_TAGS,
DD_SOURCE,
DD_CUSTOM_TAGS,
DD_SERVICE,
DD_FORWARDER_VERSION,
)

logger = logging.getLogger()
Expand All @@ -37,51 +28,36 @@
def parse(event, context, cache_layer):
"""Parse Lambda input to normalized events"""
metadata = generate_metadata(context)
event_type = AwsEventType.UNKNOWN
try:
# Route to the corresponding parser
event_type = parse_event_type(event)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Parsed event type: {event_type}")
set_forwarder_telemetry_tags(context, event_type)
match event_type:
case AwsEventType.AWSLOGS:
aws_handler = AwsLogsHandler(context, cache_layer)
events = aws_handler.handle(event)
return collect_and_count(events)
case AwsEventType.S3:
s3_handler = S3EventHandler(context, metadata, cache_layer)
events = s3_handler.handle(event)
case AwsEventType.AWSLOGS:
aws_handler = AwsLogsHandler(context, cache_layer)
# regenerate a metadata object for each event
metadata = generate_metadata(context)
events = aws_handler.handle(event, metadata)
case AwsEventType.EVENTS:
events = cwevent_handler(event, metadata)
case AwsEventType.SNS:
events = sns_handler(event, metadata)
case AwsEventType.KINESIS:
events = kinesis_awslogs_handler(event, context, cache_layer)
case _:
events = ["Parsing: Unsupported event type"]
return collect_and_count(events)
except Exception as e:
# Logs through the socket the error
err_message = "Error parsing the object. Exception: {} for event {}".format(
str(e), event
)
events = [err_message]

set_forwarder_telemetry_tags(context, event_type)

return normalize_events(events, metadata)


def generate_custom_tags(context):
dd_custom_tags_data = {
FORWARDERNAME_STRING: context.function_name.lower(),
FORWARDERMEMSIZE_STRING: context.memory_limit_in_mb,
FORWARDERVERSION_STRING: DD_FORWARDER_VERSION,
}

return dd_custom_tags_data


def parse_event_type(event):
if records := event.get(str(AwsEventTypeKeyword.RECORDS), None):
record = records[0]
Expand Down Expand Up @@ -138,36 +114,10 @@ def reformat_record(record):

awslogs_handler = AwsLogsHandler(context, cache_layer)
return itertools.chain.from_iterable(
awslogs_handler.handle(reformat_record(r), generate_metadata(context))
for r in event["Records"]
awslogs_handler.handle(reformat_record(r)) for r in event["Records"]
)


def generate_metadata(context):
metadata = {
SOURCECATEGORY_STRING: AWS_STRING,
AWS_STRING: {
FUNCTIONVERSION_STRING: context.function_version,
INVOKEDFUNCTIONARN_STRING: context.invoked_function_arn,
},
}
# Add custom tags here by adding new value with the following format "key1:value1, key2:value2" - might be subject to modifications
dd_custom_tags_data = generate_custom_tags(context)
metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
[
DD_TAGS,
",".join(
["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()]
),
],
)
)

return metadata


def normalize_events(events, metadata):
normalized = []
events_counter = 0
Expand All @@ -186,3 +136,15 @@ def normalize_events(events, metadata):
send_event_metric("incoming_events", events_counter)

return normalized


def collect_and_count(events):
collected = []
counter = 0
for event in events:
counter += 1
collected.append(event)

send_event_metric("incoming_events", counter)

return collected
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[
{
"aws": {
"awslogs": {
"logGroup": "/aws/lambda/test-lambda-default-log-group",
"logStream": "2023/11/06/[$LATEST]b25b1f977b3e416faa45a00f427e7acb",
"owner": "123456789012"
},
"function_version": 0,
"invoked_function_arn": "invoked_function_arn"
},
"ddsource": "lambda",
"ddsourcecategory": "aws",
"ddtags": "forwardername:function_name,forwarder_memorysize:10,forwarder_version:4.0.1,env:none",
"host": "/aws/lambda/test-lambda-default-log-group",
"id": "37199773595581154154810589279545129148442535997644275712",
"lambda": {
"arn": "invoked_function_arnfunction:test-lambda-default-log-group"
},
"message": "2021-01-02 03:04:05 UTC::@:[5306]:LOG: database system is ready to accept connections",
"service": "lambda",
"timestamp": 1668095539607
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
"logGroup": "/aws/rds/instance/datadog/postgresql",
"logStream": "datadog.0",
"owner": "123456789012"
}
},
"function_version": 0,
"invoked_function_arn": "invoked_function_arn"
},
"ddsource": "cloudwatch",
"ddsourcecategory": "aws",
"ddtags": "forwardername:function_name,forwarder_memorysize:10,forwarder_version:4.0.1,test_tag_key:test_tag_value",
"host": "/aws/rds/instance/datadog/postgresql",
"id": "31953106606966983378809025079804211143289615424298221568",
"message": "2021-01-02 03:04:05 UTC::@:[5306]:LOG: database system is ready to accept connections",
"service": "cloudwatch",
"timestamp": 1609556645000
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
"logGroup": "test/logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"owner": "425362996713"
}
},
"function_version": 0,
"invoked_function_arn": "invoked_function_arn"
},
"ddsource": "stepfunction",
"ddsourcecategory": "aws",
"ddtags": "forwardername:function_name,forwarder_memorysize:10,forwarder_version:4.0.1,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"service": "stepfunction",
"timestamp": 1668095539607
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
"logGroup": "/aws/vendedlogs/states/logs-to-traces-sequential-Logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"owner": "425362996713"
}
},
"function_version": 0,
"invoked_function_arn": "invoked_function_arn"
},
"ddsource": "stepfunction",
"ddsourcecategory": "aws",
"ddtags": "forwardername:function_name,forwarder_memorysize:10,forwarder_version:4.0.1,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction1",
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction1:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"service": "stepfunction",
"timestamp": 1668095539607
}
]
Loading
Loading