Skip to content

Commit

Permalink
add properties for stream event source mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredstehler authored and shyamsfo committed Jun 9, 2021
1 parent 7769696 commit dc6fc65
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ private LambdaUpdateEventConfigurationTaskOutput updateEventConfiguration(Lambda

final List<String> existingEvents = getExistingEvents(lf, targetArn);
taskInput.getTriggerArns().stream()
.filter(curr -> { return !existingEvents.contains(curr); })
.forEach( curr -> {
LambdaEventConfigurationDescription singleEvent = formEventObject(curr, taskInput);
String rawString = utils.asString(singleEvent);
Expand All @@ -192,18 +191,56 @@ private LambdaUpdateEventConfigurationTaskOutput updateEventConfiguration(Lambda

private LambdaEventConfigurationDescription formEventObject(String curr, LambdaUpdateEventConfigurationTaskInput taskInput) {
LambdaEventConfigurationDescription singleEvent = LambdaEventConfigurationDescription.builder().eventSourceArn(curr).batchsize(taskInput.getBatchsize()).enabled(true)
.maxBatchingWindowSecs(taskInput.getMaxBatchingWindowSecs())
.account(taskInput.getAccount()).credentials(taskInput.getCredentials()).appName(taskInput.getAppName())
.region(taskInput.getRegion()).functionName(taskInput.getFunctionName()).qualifier(taskInput.getQualifier()).
build();
if (curr.startsWith(DYNAMO_EVENT_PREFIX) || curr.startsWith(KINESIS_EVENT_PREFIX)) {
if (StringUtils.isNullOrEmpty(taskInput.getStartingPosition())) {
taskInput.setStartingPosition(DEFAULT_STARTING_POSITION);
}

singleEvent.setBisectBatchOnError(taskInput.getBisectBatchOnError());
singleEvent.setDestinationConfig(formDestinationConfig(taskInput));
singleEvent.setMaxRecordAgeSecs(taskInput.getMaxRecordAgeSecs());
singleEvent.setMaxRetryAttempts(taskInput.getMaxRetryAttempts());
singleEvent.setParallelizationFactor(taskInput.getParallelizationFactor());
singleEvent.setStartingPosition(taskInput.getStartingPosition());

if (taskInput.getTumblingWindowSecs() != null && taskInput.getTumblingWindowSecs() != -1) {
singleEvent.setTumblingWindowSecs(taskInput.getTumblingWindowSecs());
}
}
return singleEvent;
}

private Map<String, Object> formDestinationConfig(LambdaUpdateEventConfigurationTaskInput taskInput) {
Map<String, Object> destinationConfig = null;

if (taskInput.getDestinationConfig() != null) {
String onFailureArn = taskInput.getDestinationConfig().get("onFailureArn");
String onSuccessArn = taskInput.getDestinationConfig().get("onSuccessArn");

if(StringUtils.isNotNullOrEmpty(onFailureArn) || StringUtils.isNotNullOrEmpty(onSuccessArn)) {
destinationConfig = new HashMap<String, Object>();

if(StringUtils.isNotNullOrEmpty(onFailureArn)) {
Map<String, String> onFailure = new HashMap<String, String>();
onFailure.put("destination", onFailureArn);
destinationConfig.put("onFailure", onFailure);
}

if(StringUtils.isNotNullOrEmpty(onSuccessArn)) {
Map<String, String> onSuccess = new HashMap<String, String>();
onSuccess.put("destination", onSuccessArn);
destinationConfig.put("onSuccess", onSuccess);
}
}
}

return destinationConfig;
}

private LambdaCloudOperationOutput deleteLambdaEventConfig(LambdaDeleteEventTaskInput inp) {
inp.setCredentials(inp.getAccount());
String endPoint = cloudDriverUrl + CLOUDDRIVER_DELETE_EVENT_CONFIGURATION_LAMBDA_PATH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import lombok.Builder;
import lombok.Data;

import java.util.Map;

@Data
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -28,7 +30,13 @@ public class LambdaEventConfigurationDescription {
String eventSourceArn;
boolean enabled;
int batchsize;
Boolean bisectBatchOnError;
Integer maxBatchingWindowSecs;
Integer maxRecordAgeSecs;
Integer maxRetryAttempts;
Integer parallelizationFactor;
Integer tumblingWindowSecs;
String qualifier;
String startingPosition;

Map<String, Object> destinationConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Builder;
import lombok.Data;

import java.util.Map;
import java.util.List;

@Data
Expand All @@ -29,7 +30,14 @@ public class LambdaUpdateEventConfigurationTaskInput {
private boolean clearExisting;
private String appName, credentials, account, functionName, region;
private List<String> triggerArns;
private Map<String, String> destinationConfig;
private int batchsize;
private Boolean bisectBatchOnError;
private Integer maxBatchingWindowSecs;
private Integer maxRecordAgeSecs;
private Integer maxRetryAttempts;
private Integer parallelizationFactor;
private Integer tumblingWindowSecs;
private String aliasName, qualifier;
private String startingPosition;
private List<LambdaEventConfigurationDescription> eventConfigurationInputList;
Expand Down

0 comments on commit dc6fc65

Please sign in to comment.