Skip to content

Commit

Permalink
Added maximum workflow sweep postpone config
Browse files Browse the repository at this point in the history
  • Loading branch information
danmiller192 committed Dec 11, 2024
1 parent 12ebe53 commit 9552f6f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class ConductorProperties {
@DurationUnit(ChronoUnit.SECONDS)
private Duration workflowOffsetTimeout = Duration.ofSeconds(30);

/** The maximum timeout duration to set when a workflow is pushed to the decider queue. */
@DurationUnit(ChronoUnit.SECONDS)
private Duration maxPostponeDurationSeconds = Duration.ofSeconds(120);

/** The number of threads to use to do background sweep on active workflows. */
private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2;

Expand Down Expand Up @@ -251,6 +255,14 @@ public void setWorkflowOffsetTimeout(Duration workflowOffsetTimeout) {
this.workflowOffsetTimeout = workflowOffsetTimeout;
}

public Duration getMaxPostponeDurationSeconds() {
return maxPostponeDurationSeconds;
}

public void setMaxPostponeDurationSeconds(Duration maxPostponeDurationSeconds) {
this.maxPostponeDurationSeconds = maxPostponeDurationSeconds;
}

public int getSweeperThreadCount() {
return sweeperThreadCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,18 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) {
} else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) {
postponeDurationSeconds = workflowOffsetTimeout;
} else {
postponeDurationSeconds = workflowOffsetTimeout;
postponeDurationSeconds =
(taskModel.getResponseTimeoutSeconds() != 0)
? taskModel.getResponseTimeoutSeconds() + 1
: workflowOffsetTimeout;
}

if (postponeDurationSeconds
> properties.getMaxPostponeDurationSeconds().getSeconds()) {
postponeDurationSeconds =
properties.getMaxPostponeDurationSeconds().getSeconds();
}

break;
} else if (taskModel.getStatus() == Status.SCHEDULED) {
Optional<TaskDef> taskDefinition = taskModel.getTaskDefinition();
Expand Down

0 comments on commit 9552f6f

Please sign in to comment.