Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/939208 fix persistence #309

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public class ConductorProperties {
/** Used to enable/disable the workflow execution lock. */
private boolean workflowExecutionLockEnabled = false;

/** Used to enable/disable the duplicate message delivery persistence. */
private boolean eventExecutionPersistenceEnabled = true;

/** The time (in milliseconds) for which the lock is leased for. */
private Duration lockLeaseTime = Duration.ofMillis(60000);

Expand Down Expand Up @@ -299,6 +302,14 @@ public void setWorkflowExecutionLockEnabled(boolean workflowExecutionLockEnabled
this.workflowExecutionLockEnabled = workflowExecutionLockEnabled;
}

public boolean isEventExecutionPersistenceEnabled() {
return eventExecutionPersistenceEnabled;
}

public void setEventExecutionPersistenceEnabled(boolean eventExecutionPersistenceEnabled) {
this.eventExecutionPersistenceEnabled = eventExecutionPersistenceEnabled;
}

public Duration getLockLeaseTime() {
return lockLeaseTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ExecutionService {
private final TaskStatusListener taskStatusListener;

private final long queueTaskMessagePostponeSecs;
private final boolean isEventExecutionPersistenceEnabled;

private static final int MAX_POLL_TIMEOUT_MS = 5000;
private static final int POLL_COUNT_ONE = 1;
Expand All @@ -76,6 +77,9 @@ public ExecutionService(

this.queueTaskMessagePostponeSecs =
properties.getTaskExecutionPostponeDuration().getSeconds();

this.isEventExecutionPersistenceEnabled = properties.isEventExecutionPersistenceEnabled();

this.systemTaskRegistry = systemTaskRegistry;
this.taskStatusListener = taskStatusListener;
}
Expand Down Expand Up @@ -576,15 +580,22 @@ public List<Task> getPendingTasksForTaskType(String taskType) {
}

public boolean addEventExecution(EventExecution eventExecution) {
return executionDAOFacade.addEventExecution(eventExecution);
if (isEventExecutionPersistenceEnabled) {
return executionDAOFacade.addEventExecution(eventExecution);
}
return true;
}

public void removeEventExecution(EventExecution eventExecution) {
executionDAOFacade.removeEventExecution(eventExecution);
if (isEventExecutionPersistenceEnabled) {
executionDAOFacade.removeEventExecution(eventExecution);
}
}

public void updateEventExecution(EventExecution eventExecution) {
executionDAOFacade.updateEventExecution(eventExecution);
if (isEventExecutionPersistenceEnabled) {
executionDAOFacade.updateEventExecution(eventExecution);
}
}

/**
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ conductor.default-event-queue.type=sqs
conductor.app.workflow-execution-lock-enabled=false
conductor.workflow-execution-lock.type=noop_lock

#Used to enable/disable the duplicate message delivery persistence
conductor.app.eventExecutionPersistenceEnabled=true

#Redis cluster settings for locking module
# conductor.redis-lock.serverType=single
#Comma separated list of server nodes
Expand Down
Loading