Skip to content

Commit

Permalink
Merge branch 'conductor-oss:main' into feature/add-opensearch
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbelval authored Dec 20, 2024
2 parents 29ffc0b + e0780c3 commit d24f2bf
Show file tree
Hide file tree
Showing 19 changed files with 1,802 additions and 46 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
![Conductor OSS Logo](https://assets.conductor-oss.org/logo.png "Conductor OSS")

<picture>
<!-- Dark mode logo -->
<source srcset="https://github.com/user-attachments/assets/104b3a67-6013-4622-8075-a45da3a9e726" media="(prefers-color-scheme: dark)">
<!-- Light mode logo -->
<img src="https://assets.conductor-oss.org/logo.png" alt="Logo">
</picture>


<h1 align="center" style="border-bottom: none">
Scalable Workflow Orchestration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public class ConductorProperties {
@DurationUnit(ChronoUnit.SECONDS)
private Duration workflowOffsetTimeout = Duration.ofSeconds(30);

/**
* The maximum timeout duration to set when a workflow with running task is pushed to the
* decider queue.
*/
@DurationUnit(ChronoUnit.SECONDS)
private Duration maxPostponeDurationSeconds = Duration.ofSeconds(3600);

/** The number of threads to use to do background sweep on active workflows. */
private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2;

Expand Down Expand Up @@ -251,6 +258,14 @@ public void setWorkflowOffsetTimeout(Duration workflowOffsetTimeout) {
this.workflowOffsetTimeout = workflowOffsetTimeout;
}

public Duration getMaxPostponeDurationSeconds() {
return maxPostponeDurationSeconds;
}

public void setMaxPostponeDurationSeconds(Duration maxPostponeDurationSeconds) {
this.maxPostponeDurationSeconds = maxPostponeDurationSeconds;
}

public int getSweeperThreadCount() {
return sweeperThreadCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) {
? taskModel.getResponseTimeoutSeconds() + 1
: workflowOffsetTimeout;
}

if (postponeDurationSeconds
> properties.getMaxPostponeDurationSeconds().getSeconds()) {
postponeDurationSeconds =
properties.getMaxPostponeDurationSeconds().getSeconds();
}

break;
} else if (taskModel.getStatus() == Status.SCHEDULED) {
Optional<TaskDef> taskDefinition = taskModel.getTaskDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class TestWorkflowSweeper {
private ExecutionLockService executionLockService;

private int defaultPostPoneOffSetSeconds = 1800;
private int defaulMmaxPostponeDurationSeconds = 2000000;

@Before
public void setUp() {
Expand Down Expand Up @@ -79,6 +80,8 @@ public void testPostponeDurationForHumanTaskType() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
when(properties.getMaxPostponeDurationSeconds())
.thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds));
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
Expand All @@ -98,6 +101,8 @@ public void testPostponeDurationForWaitTaskType() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
when(properties.getMaxPostponeDurationSeconds())
.thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds));
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
Expand All @@ -119,6 +124,8 @@ public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
when(properties.getMaxPostponeDurationSeconds())
.thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds));
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
Expand Down Expand Up @@ -174,6 +181,8 @@ public void testPostponeDurationForTaskInProgress() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
when(properties.getMaxPostponeDurationSeconds())
.thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds));
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
Expand All @@ -195,12 +204,38 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() {
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
when(properties.getMaxPostponeDurationSeconds())
.thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds));
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE, workflowModel.getWorkflowId(), (responseTimeout + 1) * 1000);
}

@Test
public void
testPostponeDurationForTaskInProgressWithResponseTimeoutSetLongerThanMaxPostponeDuration() {
long responseTimeout = defaulMmaxPostponeDurationSeconds + 1;
WorkflowModel workflowModel = new WorkflowModel();
workflowModel.setWorkflowId("1");
TaskModel taskModel = new TaskModel();
taskModel.setTaskId("task1");
taskModel.setTaskType(TaskType.TASK_TYPE_SIMPLE);
taskModel.setStatus(Status.IN_PROGRESS);
taskModel.setResponseTimeoutSeconds(responseTimeout);
workflowModel.setTasks(List.of(taskModel));
when(properties.getWorkflowOffsetTimeout())
.thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds));
when(properties.getMaxPostponeDurationSeconds())
.thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds));
workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds);
verify(queueDAO)
.setUnackTimeout(
DECIDER_QUEUE,
workflowModel.getWorkflowId(),
defaulMmaxPostponeDurationSeconds * 1000L);
}

@Test
public void testPostponeDurationForTaskInScheduled() {
WorkflowModel workflowModel = new WorkflowModel();
Expand Down
Loading

0 comments on commit d24f2bf

Please sign in to comment.