diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index f07f1485f..7ef41368a 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -42,6 +42,10 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration workflowOffsetTimeout = Duration.ofSeconds(30); + /** The maximum timeout duration to set when a workflow is pushed to the decider queue. */ + @DurationUnit(ChronoUnit.SECONDS) + private Duration maxPostponeDurationSeconds = Duration.ofSeconds(3600); + /** The number of threads to use to do background sweep on active workflows. */ private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2; @@ -251,6 +255,14 @@ public void setWorkflowOffsetTimeout(Duration workflowOffsetTimeout) { this.workflowOffsetTimeout = workflowOffsetTimeout; } + public Duration getMaxPostponeDurationSeconds() { + return maxPostponeDurationSeconds; + } + + public void setMaxPostponeDurationSeconds(Duration maxPostponeDurationSeconds) { + this.maxPostponeDurationSeconds = maxPostponeDurationSeconds; + } + public int getSweeperThreadCount() { return sweeperThreadCount; } diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 3f5a9c5b6..9702eadd6 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -144,6 +144,13 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { ? taskModel.getResponseTimeoutSeconds() + 1 : workflowOffsetTimeout; } + + if (postponeDurationSeconds + > properties.getMaxPostponeDurationSeconds().getSeconds()) { + postponeDurationSeconds = + properties.getMaxPostponeDurationSeconds().getSeconds(); + } + break; } else if (taskModel.getStatus() == Status.SCHEDULED) { Optional taskDefinition = taskModel.getTaskDefinition(); diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 53bd3bb48..c882ec45f 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -49,6 +49,7 @@ public class TestWorkflowSweeper { private ExecutionLockService executionLockService; private int defaultPostPoneOffSetSeconds = 1800; + private int defaulMmaxPostponeDurationSeconds = 2000000; @Before public void setUp() { @@ -79,6 +80,8 @@ public void testPostponeDurationForHumanTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -98,6 +101,8 @@ public void testPostponeDurationForWaitTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -119,6 +124,8 @@ public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -174,6 +181,8 @@ public void testPostponeDurationForTaskInProgress() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -201,6 +210,30 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() { DECIDER_QUEUE, workflowModel.getWorkflowId(), (responseTimeout + 1) * 1000); } + @Test + public void + testPostponeDurationForTaskInProgressWithResponseTimeoutSetLongerThanMaxPostponeDuration() { + long responseTimeout = defaulMmaxPostponeDurationSeconds + 1; + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_SIMPLE); + taskModel.setStatus(Status.IN_PROGRESS); + taskModel.setResponseTimeoutSeconds(responseTimeout); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, + workflowModel.getWorkflowId(), + defaulMmaxPostponeDurationSeconds * 1000L); + } + @Test public void testPostponeDurationForTaskInScheduled() { WorkflowModel workflowModel = new WorkflowModel();