From 052d8629555fc45804679255f99ed9cf97c6e93a Mon Sep 17 00:00:00 2001 From: Michael Leditschke Date: Thu, 18 Nov 2021 09:58:09 +1100 Subject: [PATCH] fix: json handling (#11) Correct issues with the processing of json vs text. --- cloudwatch-firehose-es/src/run.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/cloudwatch-firehose-es/src/run.py b/cloudwatch-firehose-es/src/run.py index b730f98..3567b30 100644 --- a/cloudwatch-firehose-es/src/run.py +++ b/cloudwatch-firehose-es/src/run.py @@ -38,10 +38,10 @@ 4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom transformations on the log events. -5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that - this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent - method. -6) Any additional records which exceed 6MB will be re-ingested back into Firehose. +5) Create extra records if categorizing and record contains categories +6) Concatenate the result from (4/5) together and set the result as the data of the record returned to Firehose. Add a + delimiter so records are easily delineated if multiple end up in one S3 object. +7) Any additional records which exceed 6MB will be re-ingested back into Firehose. """ @@ -64,7 +64,7 @@ def transformLogEvent(log_event, cloudwatch_info): log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str} Returns: - str: The transformed log event. + dict: The transformed log event as json """ json_event = {} @@ -84,7 +84,7 @@ def transformLogEvent(log_event, cloudwatch_info): json_event['cloudwatch'] = cloudwatch_info - return json.dumps(json_event) + return json_event def processRecords(records): for r in records: @@ -110,7 +110,7 @@ def processRecords(records): for i, event in enumerate(data['logEvents']): if i == 0: - cw_log_data = transformLogEvent(event, cloudwatch_info) + cw_log_json = transformLogEvent(event, cloudwatch_info) # Check if we want to categorize on the basis of an attribute categorization_attribute = os.environ.get("CATEGORIZATION_ATTRIBUTE", None) @@ -118,7 +118,7 @@ def processRecords(records): if categorization_attribute is not None: # Determine if extra ways to categorize the event were provided - categories = getattr(cw_log_data, categorization_attribute, [] ) + categories = getattr(cw_log_json, categorization_attribute, [] ) if not isinstance(categories, list): categories = [categories] @@ -127,10 +127,13 @@ def processRecords(records): for j,category in enumerate(categories): if j == 0: - cw_log_data['categories'] = category + cw_log_json['categories'] = category + + # Format the record including delimiter + cw_log_string = json.dumps(cw_log_json) yield { - 'data' : str(base64.b64encode(cw_log_data.encode()), 'utf-8'), + 'data' : str(base64.b64encode(cw_log_string.encode()), 'utf-8'), 'result' : 'Ok', 'recordId' : recId, 'metadata' : { @@ -142,7 +145,7 @@ def processRecords(records): else: category_event = event - category_event['categories'] = [category] + category_event['message'][categorization_attribute] = [category] reingest_event = data reingest_event['logEvents'] = [ category_event ] @@ -157,8 +160,11 @@ def processRecords(records): 'recordId' : recId } else: + # Format the record including delimiter + cw_log_string = json.dumps(cw_log_json) + yield { - 'data' : str(base64.b64encode(cw_log_data.encode()), 'utf-8'), + 'data' : str(base64.b64encode(cw_log_string.encode()), 'utf-8'), 'result' : 'Ok', 'recordId' : recId }