From 62a31f55cdbc332b4e3f04fb54cf8ad9b5fa605d Mon Sep 17 00:00:00 2001 From: Eric Ely Date: Wed, 18 Nov 2020 14:18:55 -0500 Subject: [PATCH 1/5] support all new kinesis config --- zappa/utilities.py | 68 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/zappa/utilities.py b/zappa/utilities.py index 49c3b3351..49d15f325 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -351,9 +351,75 @@ def add(self, function): if self.filters: self.add_filters(function) + class ExtendedKinesisEventSource(kappa.event_source.kinesis.KinesisEventSource): + @property + def batch_window(self): + return self._config.get('batch_window', 0) + + @property + def retry_attempts(self): + return self._config.get('retry_attempts', -1) + + @property + def split_batch_on_error(self): + return self._config.get('split_batch_on_error', False) + + @property + def concurrent_batches_per_shard(self): + return self._config.get('concurrent_batches_per_shard', 1) + + @property + def maximum_age_of_record(self): + return self._config.get('maximum_age_of_record', -1) + + @property + def destination_config(self): + return self._config.get('destination_config', {}) + + def add(self, function): + try: + response = self._lambda.call( + 'create_event_source_mapping', + FunctionName=function.name, + EventSourceArn=self.arn, + BatchSize=self.batch_size, + MaximumBatchingWindowInSeconds=self.batch_window, + StartingPosition=self.starting_position, + Enabled=self.enabled, + MaximumRetryAttempts=self.retry_attempts, + BisectBatchOnFunctionError=self.split_batch_on_error, + MaximumRecordAgeInSeconds=self.maximum_age_of_record, + ParallelizationFactor=self.concurrent_batches_per_shard, + DestinationConfig=self.destination_config + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to add event source') + + def update(self, function): + response = None + uuid = self._get_uuid(function) + if uuid: + try: + response = self._lambda.call( + 'update_event_source_mapping', + BatchSize=self.batch_size, + MaximumBatchingWindowInSeconds=self.batch_window, + Enabled=self.enabled, + FunctionName=function.arn, + MaximumRetryAttempts=self.retry_attempts, + BisectBatchOnFunctionError=self.split_batch_on_error, + MaximumRecordAgeInSeconds=self.maximum_age_of_record, + ParallelizationFactor=self.concurrent_batches_per_shard, + DestinationConfig=self.destination_config + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to update event source') + event_source_map = { 'dynamodb': kappa.event_source.dynamodb_stream.DynamoDBStreamEventSource, - 'kinesis': kappa.event_source.kinesis.KinesisEventSource, + 'kinesis': ExtendedKinesisEventSource, 's3': kappa.event_source.s3.S3EventSource, 'sns': ExtendedSnsEventSource, 'sqs': SqsEventSource, From 432f1e188aa28c056e04db686a4cfc48ee14af11 Mon Sep 17 00:00:00 2001 From: Eric Ely Date: Wed, 18 Nov 2020 15:11:22 -0500 Subject: [PATCH 2/5] add aws defaults --- zappa/utilities.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/zappa/utilities.py b/zappa/utilities.py index 49d15f325..cb5d3ed1c 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -358,7 +358,9 @@ def batch_window(self): @property def retry_attempts(self): - return self._config.get('retry_attempts', -1) + # default is 10,000 according to + # https://aws.amazon.com/blogs/compute/new-aws-lambda-controls-for-stream-processing-and-asynchronous-invocations/ + return self._config.get('retry_attempts', 10000) @property def split_batch_on_error(self): @@ -370,7 +372,8 @@ def concurrent_batches_per_shard(self): @property def maximum_age_of_record(self): - return self._config.get('maximum_age_of_record', -1) + # default maximum age is a week, the limit of kinesis + return self._config.get('maximum_age_of_record', 604,800) @property def destination_config(self): From cb01bcf1e16b09b55a55337141da3f6595ab390b Mon Sep 17 00:00:00 2001 From: Eric Ely Date: Wed, 18 Nov 2020 15:50:38 -0500 Subject: [PATCH 3/5] use -1 for infinity for some config --- zappa/utilities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zappa/utilities.py b/zappa/utilities.py index cb5d3ed1c..d8f507fa9 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -360,7 +360,7 @@ def batch_window(self): def retry_attempts(self): # default is 10,000 according to # https://aws.amazon.com/blogs/compute/new-aws-lambda-controls-for-stream-processing-and-asynchronous-invocations/ - return self._config.get('retry_attempts', 10000) + return self._config.get('retry_attempts', -1) @property def split_batch_on_error(self): @@ -373,7 +373,7 @@ def concurrent_batches_per_shard(self): @property def maximum_age_of_record(self): # default maximum age is a week, the limit of kinesis - return self._config.get('maximum_age_of_record', 604,800) + return self._config.get('maximum_age_of_record', -1) @property def destination_config(self): From e0337b3ea04b340c1ee5d6ff1d1b9ade4c44824d Mon Sep 17 00:00:00 2001 From: Eric Ely Date: Wed, 18 Nov 2020 15:54:03 -0500 Subject: [PATCH 4/5] update doc --- README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c70967869..6ab360177 100644 --- a/README.md +++ b/README.md @@ -557,7 +557,16 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late "arn": "arn:aws:dynamodb:us-east-1:1234554:table/YourTable/stream/2016-05-11T00:00:00.000", "starting_position": "TRIM_HORIZON", // Supported values: TRIM_HORIZON, LATEST "batch_size": 50, // Max: 1000 - "enabled": true // Default is false + "enabled": true, // Default is false + "retry_attempts": 10 // default is -1, which is infinite number + "split_batch_on_error": True // Default is false + "concurrent_batches_per_shard": 2, // Default is 1 + "maximum_age_of_record": 3600 // Default is -1, which is an infinitely old record + "destination_config": { // Allows failures to be sent to SNS or SQS ARN + "OnFailure": { + "Destination": "arn:aws:sns:us-east-2:123:my-sns" + } + } } } ] From edc4f4a1b77ce05eb1416feee3250d165316186f Mon Sep 17 00:00:00 2001 From: Eric Ely Date: Wed, 18 Nov 2020 16:11:20 -0500 Subject: [PATCH 5/5] fix typo in doc --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6ab360177..4037a6ee1 100644 --- a/README.md +++ b/README.md @@ -559,7 +559,7 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late "batch_size": 50, // Max: 1000 "enabled": true, // Default is false "retry_attempts": 10 // default is -1, which is infinite number - "split_batch_on_error": True // Default is false + "split_batch_on_error": true // Default is false "concurrent_batches_per_shard": 2, // Default is 1 "maximum_age_of_record": 3600 // Default is -1, which is an infinitely old record "destination_config": { // Allows failures to be sent to SNS or SQS ARN