Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to set new Kinesis config #2183

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,16 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late
"arn": "arn:aws:dynamodb:us-east-1:1234554:table/YourTable/stream/2016-05-11T00:00:00.000",
"starting_position": "TRIM_HORIZON", // Supported values: TRIM_HORIZON, LATEST
"batch_size": 50, // Max: 1000
"enabled": true // Default is false
"enabled": true, // Default is false
"retry_attempts": 10 // default is -1, which is infinite number
"split_batch_on_error": true // Default is false
"concurrent_batches_per_shard": 2, // Default is 1
"maximum_age_of_record": 3600 // Default is -1, which is an infinitely old record
"destination_config": { // Allows failures to be sent to SNS or SQS ARN
"OnFailure": {
"Destination": "arn:aws:sns:us-east-2:123:my-sns"
}
}
}
}
]
Expand Down
71 changes: 70 additions & 1 deletion zappa/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,78 @@ def add(self, function):
if self.filters:
self.add_filters(function)

class ExtendedKinesisEventSource(kappa.event_source.kinesis.KinesisEventSource):
@property
def batch_window(self):
return self._config.get('batch_window', 0)

@property
def retry_attempts(self):
# default is 10,000 according to
# https://aws.amazon.com/blogs/compute/new-aws-lambda-controls-for-stream-processing-and-asynchronous-invocations/
return self._config.get('retry_attempts', -1)

@property
def split_batch_on_error(self):
return self._config.get('split_batch_on_error', False)

@property
def concurrent_batches_per_shard(self):
return self._config.get('concurrent_batches_per_shard', 1)

@property
def maximum_age_of_record(self):
# default maximum age is a week, the limit of kinesis
return self._config.get('maximum_age_of_record', -1)

@property
def destination_config(self):
return self._config.get('destination_config', {})

def add(self, function):
try:
response = self._lambda.call(
'create_event_source_mapping',
FunctionName=function.name,
EventSourceArn=self.arn,
BatchSize=self.batch_size,
MaximumBatchingWindowInSeconds=self.batch_window,
StartingPosition=self.starting_position,
Enabled=self.enabled,
MaximumRetryAttempts=self.retry_attempts,
BisectBatchOnFunctionError=self.split_batch_on_error,
MaximumRecordAgeInSeconds=self.maximum_age_of_record,
ParallelizationFactor=self.concurrent_batches_per_shard,
DestinationConfig=self.destination_config
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add event source')

def update(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.call(
'update_event_source_mapping',
BatchSize=self.batch_size,
MaximumBatchingWindowInSeconds=self.batch_window,
Enabled=self.enabled,
FunctionName=function.arn,
MaximumRetryAttempts=self.retry_attempts,
BisectBatchOnFunctionError=self.split_batch_on_error,
MaximumRecordAgeInSeconds=self.maximum_age_of_record,
ParallelizationFactor=self.concurrent_batches_per_shard,
DestinationConfig=self.destination_config
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update event source')

event_source_map = {
'dynamodb': kappa.event_source.dynamodb_stream.DynamoDBStreamEventSource,
'kinesis': kappa.event_source.kinesis.KinesisEventSource,
'kinesis': ExtendedKinesisEventSource,
's3': kappa.event_source.s3.S3EventSource,
'sns': ExtendedSnsEventSource,
'sqs': SqsEventSource,
Expand Down