From 9552f6fa0f2b21fdcd87cd73c46b6bf3cd53bd80 Mon Sep 17 00:00:00 2001 From: DanMiller192 <55767381+danmiller192@users.noreply.github.com> Date: Tue, 19 Nov 2024 16:58:49 +0000 Subject: [PATCH] Added maximum workflow sweep postpone config --- .../conductor/core/config/ConductorProperties.java | 12 ++++++++++++ .../core/reconciliation/WorkflowSweeper.java | 12 +++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index f07f1485f..07bb14133 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -42,6 +42,10 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration workflowOffsetTimeout = Duration.ofSeconds(30); + /** The maximum timeout duration to set when a workflow is pushed to the decider queue. */ + @DurationUnit(ChronoUnit.SECONDS) + private Duration maxPostponeDurationSeconds = Duration.ofSeconds(120); + /** The number of threads to use to do background sweep on active workflows. */ private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2; @@ -251,6 +255,14 @@ public void setWorkflowOffsetTimeout(Duration workflowOffsetTimeout) { this.workflowOffsetTimeout = workflowOffsetTimeout; } + public Duration getMaxPostponeDurationSeconds() { + return maxPostponeDurationSeconds; + } + + public void setMaxPostponeDurationSeconds(Duration maxPostponeDurationSeconds) { + this.maxPostponeDurationSeconds = maxPostponeDurationSeconds; + } + public int getSweeperThreadCount() { return sweeperThreadCount; } diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index b39d122f0..9702eadd6 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -139,8 +139,18 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { postponeDurationSeconds = workflowOffsetTimeout; } else { - postponeDurationSeconds = workflowOffsetTimeout; + postponeDurationSeconds = + (taskModel.getResponseTimeoutSeconds() != 0) + ? taskModel.getResponseTimeoutSeconds() + 1 + : workflowOffsetTimeout; } + + if (postponeDurationSeconds + > properties.getMaxPostponeDurationSeconds().getSeconds()) { + postponeDurationSeconds = + properties.getMaxPostponeDurationSeconds().getSeconds(); + } + break; } else if (taskModel.getStatus() == Status.SCHEDULED) { Optional taskDefinition = taskModel.getTaskDefinition();