diff --git a/README.md b/README.md index c70967869..4037a6ee1 100644 --- a/README.md +++ b/README.md @@ -557,7 +557,16 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late "arn": "arn:aws:dynamodb:us-east-1:1234554:table/YourTable/stream/2016-05-11T00:00:00.000", "starting_position": "TRIM_HORIZON", // Supported values: TRIM_HORIZON, LATEST "batch_size": 50, // Max: 1000 - "enabled": true // Default is false + "enabled": true, // Default is false + "retry_attempts": 10 // default is -1, which is infinite number + "split_batch_on_error": true // Default is false + "concurrent_batches_per_shard": 2, // Default is 1 + "maximum_age_of_record": 3600 // Default is -1, which is an infinitely old record + "destination_config": { // Allows failures to be sent to SNS or SQS ARN + "OnFailure": { + "Destination": "arn:aws:sns:us-east-2:123:my-sns" + } + } } } ] diff --git a/zappa/utilities.py b/zappa/utilities.py index 49c3b3351..d8f507fa9 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -351,9 +351,78 @@ def add(self, function): if self.filters: self.add_filters(function) + class ExtendedKinesisEventSource(kappa.event_source.kinesis.KinesisEventSource): + @property + def batch_window(self): + return self._config.get('batch_window', 0) + + @property + def retry_attempts(self): + # default is 10,000 according to + # https://aws.amazon.com/blogs/compute/new-aws-lambda-controls-for-stream-processing-and-asynchronous-invocations/ + return self._config.get('retry_attempts', -1) + + @property + def split_batch_on_error(self): + return self._config.get('split_batch_on_error', False) + + @property + def concurrent_batches_per_shard(self): + return self._config.get('concurrent_batches_per_shard', 1) + + @property + def maximum_age_of_record(self): + # default maximum age is a week, the limit of kinesis + return self._config.get('maximum_age_of_record', -1) + + @property + def destination_config(self): + return self._config.get('destination_config', {}) + + def add(self, function): + try: + response = self._lambda.call( + 'create_event_source_mapping', + FunctionName=function.name, + EventSourceArn=self.arn, + BatchSize=self.batch_size, + MaximumBatchingWindowInSeconds=self.batch_window, + StartingPosition=self.starting_position, + Enabled=self.enabled, + MaximumRetryAttempts=self.retry_attempts, + BisectBatchOnFunctionError=self.split_batch_on_error, + MaximumRecordAgeInSeconds=self.maximum_age_of_record, + ParallelizationFactor=self.concurrent_batches_per_shard, + DestinationConfig=self.destination_config + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to add event source') + + def update(self, function): + response = None + uuid = self._get_uuid(function) + if uuid: + try: + response = self._lambda.call( + 'update_event_source_mapping', + BatchSize=self.batch_size, + MaximumBatchingWindowInSeconds=self.batch_window, + Enabled=self.enabled, + FunctionName=function.arn, + MaximumRetryAttempts=self.retry_attempts, + BisectBatchOnFunctionError=self.split_batch_on_error, + MaximumRecordAgeInSeconds=self.maximum_age_of_record, + ParallelizationFactor=self.concurrent_batches_per_shard, + DestinationConfig=self.destination_config + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to update event source') + event_source_map = { 'dynamodb': kappa.event_source.dynamodb_stream.DynamoDBStreamEventSource, - 'kinesis': kappa.event_source.kinesis.KinesisEventSource, + 'kinesis': ExtendedKinesisEventSource, 's3': kappa.event_source.s3.S3EventSource, 'sns': ExtendedSnsEventSource, 'sqs': SqsEventSource,