diff --git a/cloudwatch-firehose-es/dev/test-event-multi-categories-data.json b/cloudwatch-firehose-es/dev/test-event-multi-categories-data.json new file mode 100644 index 0000000..76ff7c5 --- /dev/null +++ b/cloudwatch-firehose-es/dev/test-event-multi-categories-data.json @@ -0,0 +1,31 @@ +{ + "messageType": "DATA_MESSAGE", + "owner": "428739250234", + "logGroup": "/ecommerce/integration/application/app/www/web/v1", + "logStream": "application-app-www-web-v1/www/ad5d3991-e6b1-476b-90b7-13dec73f18fa", + "subscriptionFilters": [ + "ecommerce-integration-soln-ecommerce-es-lsubscriptionXfirehosestreamXappXecommerceXpylogsXwebX0-1ECOURWQ2MW8Z" + ], + "logEvents": [ + { + "id": "01", + "timestamp": 1556622091428, + "message": "{\"categories\":[\"INFO\", \"XXX\"],\"message\": \"Trying to process payment for order #61075 from 30-04-2019 with $retail=74.5, $discounted=73.625, $real=73.625, %discount=5 $discount=3.875\",\"time\": \"2019-04-30T11:01:31.428306\"}" + }, + { + "id": "02", + "timestamp": 1556622091528, + "message": "{\"categories\":[\"INFO\"],\"message\": \"Trying to process payment for order #61077 from 30-04-2019 with $retail=75.5, $discounted=73.625, $real=73.625, %discount=5 $discount=3.875\",\"time\": \"2019-04-30T11:01:31.428306\"}" + }, + { + "id": "03", + "timestamp": 1556622091528, + "message": "{\"categories\":[\"SIEM\"],\"message\": \"Trying to process payment for order #61077 from 30-04-2019 with $retail=76.5, $discounted=73.625, $real=73.625, %discount=5 $discount=3.875\",\"time\": \"2019-04-30T11:01:31.428306\"}" + }, + { + "id": "04", + "timestamp": 1556622091528, + "message": "{\"categories\":[\"ERROR\"],\"message\": \"Trying to process payment for order #61077 from 30-04-2019 with $retail=77.5, $discounted=73.625, $real=73.625, %discount=5 $discount=3.875\",\"time\": \"2019-04-30T11:01:31.428306\"}" + } + ] +} \ No newline at end of file diff --git a/cloudwatch-firehose-es/dev/test-event-multi-categories.json b/cloudwatch-firehose-es/dev/test-event-multi-categories.json new file mode 100644 index 0000000..b095f64 --- /dev/null +++ b/cloudwatch-firehose-es/dev/test-event-multi-categories.json @@ -0,0 +1,10 @@ + +{ + "deliveryStreamArn" : "arn:aws:firehose:us-east-1:123456789012:deliverystream/test-event-multi-categories", + "records" : [ + { + "recordId" : "abc123", + "data" : "H4sICPZknGEAA3Rlc3QtZXZlbnQtbXVsdGktY2F0ZWdvcmllcy1kYXRhLmpzb24AzZRfb5swFMXf+yksr3uLg435EyLlIdpo1YcsapKpaKWaCBhqCTCySVAU5bvPkIVke+lUKVp5Mj7H9577s+T9DdAfLJhSUcZWu4rBMYBfp6vpz5m/XE7vfTg4WkRTMtmKljlyqWfa2KTWScxFdi/Fpmp1g8WiKJiMmcHLmmUyqrkojaiqch73a6NpGqNha2NLLoosa8mioq1yYUd6jbQdaTvaku5klNgJ9TyCmLMmyHKdNfLw2kWEJix2aUpGaXSqqzZrFUtetbXueF4zqXSH507sDH1gdBEYKZGX6CwxhfLLSkHKJXsViqkuc6BDBr07qHZ6GhXoxAFGxP8y/754ejRnT6MfsOv7cp7Z37Ky/jPRvl91Jp60RPAJVL9fc31vdVS02IltO45pYo/o+/nL9/t22yL7EGqoLBOSMxXC8XMIH77dzUM4ACEMgiCEL4PwdEDrencld7zMQC1AJUWsFVBFu0JnBqmQQMiESfDJIdi1QSpFAShG2EImJh5oeP0KbiWrI55PXGtoD8BtwlUsNppzMnHp0DHbPQ0w7/8+nxwT++ye0OHItXXMsJv6mKzt0faieEXIGJMxJUM9PcVOCA+wZ3AYvAHW/Dew9rvAvp+n+wZP+6PypNfhuXzwZ9fk6XxUntZ1ePqLxXxxTaDu/wJ6fGJvDr8A1NdHZN4GAAA=" + } + ] +} diff --git a/cloudwatch-firehose-es/dev/test-event-data.json b/cloudwatch-firehose-es/dev/test-event-no-categories-data.json similarity index 100% rename from cloudwatch-firehose-es/dev/test-event-data.json rename to cloudwatch-firehose-es/dev/test-event-no-categories-data.json diff --git a/cloudwatch-firehose-es/dev/test-event.json b/cloudwatch-firehose-es/dev/test-event-no-categories.json similarity index 100% rename from cloudwatch-firehose-es/dev/test-event.json rename to cloudwatch-firehose-es/dev/test-event-no-categories.json diff --git a/cloudwatch-firehose-es/src/run.py b/cloudwatch-firehose-es/src/run.py index 6fbf84d..7dc77b8 100644 --- a/cloudwatch-firehose-es/src/run.py +++ b/cloudwatch-firehose-es/src/run.py @@ -160,16 +160,16 @@ def processRecords(records): } else: - reingest_event = data - reingest_event['logEvents'] = [ event ] - reingest_event['metadata'] = { 'partitionKeys' : { 'category' : category } } + reingest_category_event = data.copy() + reingest_category_event['logEvents'] = [ event ] + reingest_category_event['metadata'] = { 'partitionKeys' : { 'category' : category } } - reingest_json = json.dumps(reingest_event) - reingest_bytes = reingest_json.encode('utf-8') - reingest_compress = gzip.compress(reingest_bytes) + reingest_category_json = json.dumps(reingest_category_event) + reingest_category_bytes = reingest_category_json.encode('utf-8') + reingest_category_compress = gzip.compress(reingest_category_bytes) yield { - 'data' : str( base64.b64encode( reingest_compress ), 'utf-8'), + 'data' : str( base64.b64encode( reingest_category_compress ), 'utf-8'), 'result' : 'Reingest', 'recordId' : recId } @@ -185,7 +185,7 @@ def processRecords(records): else: - reingest_event = data + reingest_event = data.copy() reingest_event['logEvents'] = [ event ] reingest_json = json.dumps(reingest_event)