diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index f07f1485f..537f703ae 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -60,6 +60,9 @@ public class ConductorProperties { /** Used to enable/disable the workflow execution lock. */ private boolean workflowExecutionLockEnabled = false; + /** Used to enable/disable the duplicate message delivery persistence. */ + private boolean eventExecutionPersistenceEnabled = true; + /** The time (in milliseconds) for which the lock is leased for. */ private Duration lockLeaseTime = Duration.ofMillis(60000); @@ -299,6 +302,14 @@ public void setWorkflowExecutionLockEnabled(boolean workflowExecutionLockEnabled this.workflowExecutionLockEnabled = workflowExecutionLockEnabled; } + public boolean isEventExecutionPersistenceEnabled() { + return eventExecutionPersistenceEnabled; + } + + public void setEventExecutionPersistenceEnabled(boolean eventExecutionPersistenceEnabled) { + this.eventExecutionPersistenceEnabled = eventExecutionPersistenceEnabled; + } + public Duration getLockLeaseTime() { return lockLeaseTime; } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 4fb3f78ea..ec29e584d 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -56,6 +56,7 @@ public class ExecutionService { private final TaskStatusListener taskStatusListener; private final long queueTaskMessagePostponeSecs; + private final boolean isEventExecutionPersistenceEnabled; private static final int MAX_POLL_TIMEOUT_MS = 5000; private static final int POLL_COUNT_ONE = 1; @@ -76,6 +77,9 @@ public ExecutionService( this.queueTaskMessagePostponeSecs = properties.getTaskExecutionPostponeDuration().getSeconds(); + + this.isEventExecutionPersistenceEnabled = properties.isEventExecutionPersistenceEnabled(); + this.systemTaskRegistry = systemTaskRegistry; this.taskStatusListener = taskStatusListener; } @@ -576,15 +580,22 @@ public List getPendingTasksForTaskType(String taskType) { } public boolean addEventExecution(EventExecution eventExecution) { - return executionDAOFacade.addEventExecution(eventExecution); + if (isEventExecutionPersistenceEnabled) { + return executionDAOFacade.addEventExecution(eventExecution); + } + return true; } public void removeEventExecution(EventExecution eventExecution) { - executionDAOFacade.removeEventExecution(eventExecution); + if (isEventExecutionPersistenceEnabled) { + executionDAOFacade.removeEventExecution(eventExecution); + } } public void updateEventExecution(EventExecution eventExecution) { - executionDAOFacade.updateEventExecution(eventExecution); + if (isEventExecutionPersistenceEnabled) { + executionDAOFacade.updateEventExecution(eventExecution); + } } /** diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index 4fcb33594..589524d93 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -73,6 +73,9 @@ conductor.default-event-queue.type=sqs conductor.app.workflow-execution-lock-enabled=false conductor.workflow-execution-lock.type=noop_lock +#Used to enable/disable the duplicate message delivery persistence +conductor.app.eventExecutionPersistenceEnabled=true + #Redis cluster settings for locking module # conductor.redis-lock.serverType=single #Comma separated list of server nodes