From 48a231a70ff667bb6396c63d9dca9b440335ab74 Mon Sep 17 00:00:00 2001 From: Liron Leizerovich Date: Wed, 13 Nov 2024 17:35:14 +0200 Subject: [PATCH 1/2] isEventExecutionPersistenceEnabled property added to enable/disable duplicate message delivery persistence mechanism. --- .../core/config/ConductorProperties.java | 10 +++++++++ .../conductor/service/ExecutionService.java | 21 ++++++++++++++++--- .../src/main/resources/application.properties | 3 +++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index f07f1485f..351d2e549 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -60,6 +60,9 @@ public class ConductorProperties { /** Used to enable/disable the workflow execution lock. */ private boolean workflowExecutionLockEnabled = false; + /** Used to enable/disable the duplicate message delivery persistence. */ + private boolean eventExecutionPersistenceEnabled = true; + /** The time (in milliseconds) for which the lock is leased for. */ private Duration lockLeaseTime = Duration.ofMillis(60000); @@ -299,6 +302,13 @@ public void setWorkflowExecutionLockEnabled(boolean workflowExecutionLockEnabled this.workflowExecutionLockEnabled = workflowExecutionLockEnabled; } + public boolean isEventExecutionPersistenceEnabled() { + return eventExecutionPersistenceEnabled; + } + public void setEventExecutionPersistenceEnabled(boolean eventExecutionPersistenceEnabled) { + this.eventExecutionPersistenceEnabled = eventExecutionPersistenceEnabled; + } + public Duration getLockLeaseTime() { return lockLeaseTime; } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 4fb3f78ea..a01363a9c 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -56,6 +56,7 @@ public class ExecutionService { private final TaskStatusListener taskStatusListener; private final long queueTaskMessagePostponeSecs; + private final boolean isEventExecutionPersistenceEnabled; private static final int MAX_POLL_TIMEOUT_MS = 5000; private static final int POLL_COUNT_ONE = 1; @@ -76,6 +77,10 @@ public ExecutionService( this.queueTaskMessagePostponeSecs = properties.getTaskExecutionPostponeDuration().getSeconds(); + + this.isEventExecutionPersistenceEnabled = + properties.isEventExecutionPersistenceEnabled(); + this.systemTaskRegistry = systemTaskRegistry; this.taskStatusListener = taskStatusListener; } @@ -576,15 +581,25 @@ public List getPendingTasksForTaskType(String taskType) { } public boolean addEventExecution(EventExecution eventExecution) { - return executionDAOFacade.addEventExecution(eventExecution); + if(isEventExecutionPersistenceEnabled) + { + return executionDAOFacade.addEventExecution(eventExecution); + } + return true; } public void removeEventExecution(EventExecution eventExecution) { - executionDAOFacade.removeEventExecution(eventExecution); + if(isEventExecutionPersistenceEnabled) + { + executionDAOFacade.removeEventExecution(eventExecution); + } } public void updateEventExecution(EventExecution eventExecution) { - executionDAOFacade.updateEventExecution(eventExecution); + if(isEventExecutionPersistenceEnabled) + { + executionDAOFacade.updateEventExecution(eventExecution); + } } /** diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index 4fcb33594..589524d93 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -73,6 +73,9 @@ conductor.default-event-queue.type=sqs conductor.app.workflow-execution-lock-enabled=false conductor.workflow-execution-lock.type=noop_lock +#Used to enable/disable the duplicate message delivery persistence +conductor.app.eventExecutionPersistenceEnabled=true + #Redis cluster settings for locking module # conductor.redis-lock.serverType=single #Comma separated list of server nodes From 7e1e03c8a98c2bae43771de963354055796466df Mon Sep 17 00:00:00 2001 From: lironleizer Date: Thu, 14 Nov 2024 14:32:19 +0200 Subject: [PATCH 2/2] isEventExecutionPersistenceEnabled property added to enable/disable duplicate message delivery persistence mechanism. --- .../conductor/core/config/ConductorProperties.java | 1 + .../netflix/conductor/service/ExecutionService.java | 12 ++++-------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index 351d2e549..537f703ae 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -305,6 +305,7 @@ public void setWorkflowExecutionLockEnabled(boolean workflowExecutionLockEnabled public boolean isEventExecutionPersistenceEnabled() { return eventExecutionPersistenceEnabled; } + public void setEventExecutionPersistenceEnabled(boolean eventExecutionPersistenceEnabled) { this.eventExecutionPersistenceEnabled = eventExecutionPersistenceEnabled; } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index a01363a9c..ec29e584d 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -78,8 +78,7 @@ public ExecutionService( this.queueTaskMessagePostponeSecs = properties.getTaskExecutionPostponeDuration().getSeconds(); - this.isEventExecutionPersistenceEnabled = - properties.isEventExecutionPersistenceEnabled(); + this.isEventExecutionPersistenceEnabled = properties.isEventExecutionPersistenceEnabled(); this.systemTaskRegistry = systemTaskRegistry; this.taskStatusListener = taskStatusListener; @@ -581,23 +580,20 @@ public List getPendingTasksForTaskType(String taskType) { } public boolean addEventExecution(EventExecution eventExecution) { - if(isEventExecutionPersistenceEnabled) - { + if (isEventExecutionPersistenceEnabled) { return executionDAOFacade.addEventExecution(eventExecution); } return true; } public void removeEventExecution(EventExecution eventExecution) { - if(isEventExecutionPersistenceEnabled) - { + if (isEventExecutionPersistenceEnabled) { executionDAOFacade.removeEventExecution(eventExecution); } } public void updateEventExecution(EventExecution eventExecution) { - if(isEventExecutionPersistenceEnabled) - { + if (isEventExecutionPersistenceEnabled) { executionDAOFacade.updateEventExecution(eventExecution); } }