diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index f07f1485f..7ef41368a 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -42,6 +42,10 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration workflowOffsetTimeout = Duration.ofSeconds(30); + /** The maximum timeout duration to set when a workflow is pushed to the decider queue. */ + @DurationUnit(ChronoUnit.SECONDS) + private Duration maxPostponeDurationSeconds = Duration.ofSeconds(3600); + /** The number of threads to use to do background sweep on active workflows. */ private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2; @@ -251,6 +255,14 @@ public void setWorkflowOffsetTimeout(Duration workflowOffsetTimeout) { this.workflowOffsetTimeout = workflowOffsetTimeout; } + public Duration getMaxPostponeDurationSeconds() { + return maxPostponeDurationSeconds; + } + + public void setMaxPostponeDurationSeconds(Duration maxPostponeDurationSeconds) { + this.maxPostponeDurationSeconds = maxPostponeDurationSeconds; + } + public int getSweeperThreadCount() { return sweeperThreadCount; } diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index b39d122f0..9702eadd6 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -139,8 +139,18 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { postponeDurationSeconds = workflowOffsetTimeout; } else { - postponeDurationSeconds = workflowOffsetTimeout; + postponeDurationSeconds = + (taskModel.getResponseTimeoutSeconds() != 0) + ? taskModel.getResponseTimeoutSeconds() + 1 + : workflowOffsetTimeout; } + + if (postponeDurationSeconds + > properties.getMaxPostponeDurationSeconds().getSeconds()) { + postponeDurationSeconds = + properties.getMaxPostponeDurationSeconds().getSeconds(); + } + break; } else if (taskModel.getStatus() == Status.SCHEDULED) { Optional taskDefinition = taskModel.getTaskDefinition(); diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 042735b70..c882ec45f 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -49,6 +49,7 @@ public class TestWorkflowSweeper { private ExecutionLockService executionLockService; private int defaultPostPoneOffSetSeconds = 1800; + private int defaulMmaxPostponeDurationSeconds = 2000000; @Before public void setUp() { @@ -79,6 +80,8 @@ public void testPostponeDurationForHumanTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -98,6 +101,8 @@ public void testPostponeDurationForWaitTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -119,6 +124,8 @@ public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -174,6 +181,8 @@ public void testPostponeDurationForTaskInProgress() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -196,11 +205,33 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() { when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, workflowModel.getWorkflowId(), (responseTimeout + 1) * 1000); + } + + @Test + public void + testPostponeDurationForTaskInProgressWithResponseTimeoutSetLongerThanMaxPostponeDuration() { + long responseTimeout = defaulMmaxPostponeDurationSeconds + 1; + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_SIMPLE); + taskModel.setStatus(Status.IN_PROGRESS); + taskModel.setResponseTimeoutSeconds(responseTimeout); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), - defaultPostPoneOffSetSeconds * 1000); + defaulMmaxPostponeDurationSeconds * 1000L); } @Test