Skip to content

Commit

Permalink
fix: json handling (#11)
Browse files Browse the repository at this point in the history
Correct issues with the processing of json vs text.
  • Loading branch information
ml019 authored Nov 17, 2021
1 parent 46cb403 commit 052d862
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions cloudwatch-firehose-es/src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
method.
6) Any additional records which exceed 6MB will be re-ingested back into Firehose.
5) Create extra records if categorizing and record contains categories
6) Concatenate the result from (4/5) together and set the result as the data of the record returned to Firehose. Add a
delimiter so records are easily delineated if multiple end up in one S3 object.
7) Any additional records which exceed 6MB will be re-ingested back into Firehose.
"""

Expand All @@ -64,7 +64,7 @@ def transformLogEvent(log_event, cloudwatch_info):
log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}
Returns:
str: The transformed log event.
dict: The transformed log event as json
"""

json_event = {}
Expand All @@ -84,7 +84,7 @@ def transformLogEvent(log_event, cloudwatch_info):

json_event['cloudwatch'] = cloudwatch_info

return json.dumps(json_event)
return json_event

def processRecords(records):
for r in records:
Expand All @@ -110,15 +110,15 @@ def processRecords(records):

for i, event in enumerate(data['logEvents']):
if i == 0:
cw_log_data = transformLogEvent(event, cloudwatch_info)
cw_log_json = transformLogEvent(event, cloudwatch_info)

# Check if we want to categorize on the basis of an attribute
categorization_attribute = os.environ.get("CATEGORIZATION_ATTRIBUTE", None)

if categorization_attribute is not None:

# Determine if extra ways to categorize the event were provided
categories = getattr(cw_log_data, categorization_attribute, [] )
categories = getattr(cw_log_json, categorization_attribute, [] )
if not isinstance(categories, list):
categories = [categories]

Expand All @@ -127,10 +127,13 @@ def processRecords(records):

for j,category in enumerate(categories):
if j == 0:
cw_log_data['categories'] = category
cw_log_json['categories'] = category

# Format the record including delimiter
cw_log_string = json.dumps(cw_log_json)

yield {
'data' : str(base64.b64encode(cw_log_data.encode()), 'utf-8'),
'data' : str(base64.b64encode(cw_log_string.encode()), 'utf-8'),
'result' : 'Ok',
'recordId' : recId,
'metadata' : {
Expand All @@ -142,7 +145,7 @@ def processRecords(records):

else:
category_event = event
category_event['categories'] = [category]
category_event['message'][categorization_attribute] = [category]

reingest_event = data
reingest_event['logEvents'] = [ category_event ]
Expand All @@ -157,8 +160,11 @@ def processRecords(records):
'recordId' : recId
}
else:
# Format the record including delimiter
cw_log_string = json.dumps(cw_log_json)

yield {
'data' : str(base64.b64encode(cw_log_data.encode()), 'utf-8'),
'data' : str(base64.b64encode(cw_log_string.encode()), 'utf-8'),
'result' : 'Ok',
'recordId' : recId
}
Expand Down

0 comments on commit 052d862

Please sign in to comment.