Skip to content

Commit

Permalink
Merge pull request #14 from ml019/fix-object-copying
Browse files Browse the repository at this point in the history
fix: cloudwatch data copying
  • Loading branch information
ml019 authored Nov 23, 2021
2 parents 20d990a + 60a1d81 commit e858a0d
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 8 deletions.
31 changes: 31 additions & 0 deletions cloudwatch-firehose-es/dev/test-event-multi-categories-data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"messageType": "DATA_MESSAGE",
"owner": "428739250234",
"logGroup": "/ecommerce/integration/application/app/www/web/v1",
"logStream": "application-app-www-web-v1/www/ad5d3991-e6b1-476b-90b7-13dec73f18fa",
"subscriptionFilters": [
"ecommerce-integration-soln-ecommerce-es-lsubscriptionXfirehosestreamXappXecommerceXpylogsXwebX0-1ECOURWQ2MW8Z"
],
"logEvents": [
{
"id": "01",
"timestamp": 1556622091428,
"message": "{\"categories\":[\"INFO\", \"XXX\"],\"message\": \"Trying to process payment for order #61075 from 30-04-2019 with $retail=74.5, $discounted=73.625, $real=73.625, %discount=5 $discount=3.875\",\"time\": \"2019-04-30T11:01:31.428306\"}"
},
{
"id": "02",
"timestamp": 1556622091528,
"message": "{\"categories\":[\"INFO\"],\"message\": \"Trying to process payment for order #61077 from 30-04-2019 with $retail=75.5, $discounted=73.625, $real=73.625, %discount=5 $discount=3.875\",\"time\": \"2019-04-30T11:01:31.428306\"}"
},
{
"id": "03",
"timestamp": 1556622091528,
"message": "{\"categories\":[\"SIEM\"],\"message\": \"Trying to process payment for order #61077 from 30-04-2019 with $retail=76.5, $discounted=73.625, $real=73.625, %discount=5 $discount=3.875\",\"time\": \"2019-04-30T11:01:31.428306\"}"
},
{
"id": "04",
"timestamp": 1556622091528,
"message": "{\"categories\":[\"ERROR\"],\"message\": \"Trying to process payment for order #61077 from 30-04-2019 with $retail=77.5, $discounted=73.625, $real=73.625, %discount=5 $discount=3.875\",\"time\": \"2019-04-30T11:01:31.428306\"}"
}
]
}
10 changes: 10 additions & 0 deletions cloudwatch-firehose-es/dev/test-event-multi-categories.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

{
"deliveryStreamArn" : "arn:aws:firehose:us-east-1:123456789012:deliverystream/test-event-multi-categories",
"records" : [
{
"recordId" : "abc123",
"data" : "H4sICPZknGEAA3Rlc3QtZXZlbnQtbXVsdGktY2F0ZWdvcmllcy1kYXRhLmpzb24AzZRfb5swFMXf+yksr3uLg435EyLlIdpo1YcsapKpaKWaCBhqCTCySVAU5bvPkIVke+lUKVp5Mj7H9577s+T9DdAfLJhSUcZWu4rBMYBfp6vpz5m/XE7vfTg4WkRTMtmKljlyqWfa2KTWScxFdi/Fpmp1g8WiKJiMmcHLmmUyqrkojaiqch73a6NpGqNha2NLLoosa8mioq1yYUd6jbQdaTvaku5klNgJ9TyCmLMmyHKdNfLw2kWEJix2aUpGaXSqqzZrFUtetbXueF4zqXSH507sDH1gdBEYKZGX6CwxhfLLSkHKJXsViqkuc6BDBr07qHZ6GhXoxAFGxP8y/754ejRnT6MfsOv7cp7Z37Ky/jPRvl91Jp60RPAJVL9fc31vdVS02IltO45pYo/o+/nL9/t22yL7EGqoLBOSMxXC8XMIH77dzUM4ACEMgiCEL4PwdEDrencld7zMQC1AJUWsFVBFu0JnBqmQQMiESfDJIdi1QSpFAShG2EImJh5oeP0KbiWrI55PXGtoD8BtwlUsNppzMnHp0DHbPQ0w7/8+nxwT++ye0OHItXXMsJv6mKzt0faieEXIGJMxJUM9PcVOCA+wZ3AYvAHW/Dew9rvAvp+n+wZP+6PypNfhuXzwZ9fk6XxUntZ1ePqLxXxxTaDu/wJ6fGJvDr8A1NdHZN4GAAA="
}
]
}
File renamed without changes.
16 changes: 8 additions & 8 deletions cloudwatch-firehose-es/src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ def processRecords(records):
}

else:
reingest_event = data
reingest_event['logEvents'] = [ event ]
reingest_event['metadata'] = { 'partitionKeys' : { 'category' : category } }
reingest_category_event = data.copy()
reingest_category_event['logEvents'] = [ event ]
reingest_category_event['metadata'] = { 'partitionKeys' : { 'category' : category } }

reingest_json = json.dumps(reingest_event)
reingest_bytes = reingest_json.encode('utf-8')
reingest_compress = gzip.compress(reingest_bytes)
reingest_category_json = json.dumps(reingest_category_event)
reingest_category_bytes = reingest_category_json.encode('utf-8')
reingest_category_compress = gzip.compress(reingest_category_bytes)

yield {
'data' : str( base64.b64encode( reingest_compress ), 'utf-8'),
'data' : str( base64.b64encode( reingest_category_compress ), 'utf-8'),
'result' : 'Reingest',
'recordId' : recId
}
Expand All @@ -185,7 +185,7 @@ def processRecords(records):

else:

reingest_event = data
reingest_event = data.copy()
reingest_event['logEvents'] = [ event ]

reingest_json = json.dumps(reingest_event)
Expand Down

0 comments on commit e858a0d

Please sign in to comment.