From bf90790c7120786574b31e1191a6218a00637855 Mon Sep 17 00:00:00 2001 From: Miguel Prieto Date: Mon, 6 Jan 2025 19:43:29 -0300 Subject: [PATCH] Changes to reduce latency in Async JOIN task. --- .../core/config/ConductorProperties.java | 37 +++++++++++++++ .../conductor/core/execution/tasks/Join.java | 22 +++++++-- .../execution/tasks/SystemTaskWorker.java | 6 ++- .../execution/tasks/WorkflowSystemTask.java | 14 +++++- .../core/execution/TestDeciderOutcomes.java | 5 +- .../core/execution/tasks/TestJoin.java | 47 ++++++++++++++++--- 6 files changed, 115 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index 0bd5e4ed2..a921c2715 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -24,6 +24,8 @@ import org.springframework.util.unit.DataSize; import org.springframework.util.unit.DataUnit; +import com.netflix.conductor.model.TaskModel; + @ConfigurationProperties("conductor.app") public class ConductorProperties { @@ -226,6 +228,25 @@ public class ConductorProperties { /** Used to limit the size of task execution logs. */ private int taskExecLogSizeLimit = 10; + /** + * This threshold defines the default number of executions after which SystemTasks implementing + * getEvaluationOffset should begin postponing execution. + * + * @see + * com.netflix.conductor.core.execution.tasks.WorkflowSystemTask#getEvaluationOffset(TaskModel, + * long) + * @see com.netflix.conductor.core.execution.tasks.Join#getEvaluationOffset(TaskModel, long) + */ + private int systemTaskPostponeThreshold = + 200; // 10 seconds based on default systemTaskWorkerPollInterval of 50ms + + /** + * Timeout in milliseconds used by {@link + * com.netflix.conductor.core.execution.tasks.SystemTaskWorker} when polling, i.e.: call to + * {@link com.netflix.conductor.dao.QueueDAO#pop(String, int, int)}. + */ + private int systemTaskQueuePopTimeout = 200; + public String getStack() { return stack; } @@ -567,4 +588,20 @@ public Map getAll() { props.forEach((key, value) -> map.put(key.toString(), value)); return map; } + + public void setSystemTaskPostponeThreshold(int systemTaskPostponeThreshold) { + this.systemTaskPostponeThreshold = systemTaskPostponeThreshold; + } + + public int getSystemTaskPostponeThreshold() { + return systemTaskPostponeThreshold; + } + + public int getSystemTaskQueuePopTimeout() { + return systemTaskQueuePopTimeout; + } + + public void setSystemTaskQueuePopTimeout(int systemTaskQueuePopTimeout) { + this.systemTaskQueuePopTimeout = systemTaskQueuePopTimeout; + } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index 5b0db258b..f7d340c8d 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -19,7 +19,9 @@ import org.springframework.stereotype.Component; +import com.netflix.conductor.annotations.VisibleForTesting; import com.netflix.conductor.common.utils.TaskUtils; +import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -29,8 +31,13 @@ @Component(TASK_TYPE_JOIN) public class Join extends WorkflowSystemTask { - public Join() { + @VisibleForTesting static final double EVALUATION_OFFSET_BASE = 1.2; + + private final ConductorProperties properties; + + public Join(ConductorProperties properties) { super(TASK_TYPE_JOIN); + this.properties = properties; } @Override @@ -117,12 +124,17 @@ public boolean execute( } @Override - public Optional getEvaluationOffset(TaskModel taskModel, long defaultOffset) { - int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0; - if (index == 0) { + public Optional getEvaluationOffset(TaskModel taskModel, long maxOffset) { + int pollCount = taskModel.getPollCount(); + // Assuming pollInterval = 50ms and evaluationOffsetThreshold = 200 this will cause + // a JOIN task to be evaluated continuously during the first 10 seconds and the FORK/JOIN + // will end with minimal delay. + if (pollCount <= properties.getSystemTaskPostponeThreshold()) { return Optional.of(0L); } - return Optional.of(Math.min((long) Math.pow(2, index), defaultOffset)); + + double exp = pollCount - properties.getSystemTaskPostponeThreshold(); + return Optional.of(Math.min((long) Math.pow(EVALUATION_OFFSET_BASE, exp), maxOffset)); } public boolean isAsync() { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java index c753fa21d..96b90ce35 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java @@ -114,7 +114,11 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) { LOGGER.debug("Polling queue: {} with {} slots acquired", queueName, messagesToAcquire); - List polledTaskIds = queueDAO.pop(queueName, messagesToAcquire, 200); + List polledTaskIds = + queueDAO.pop( + queueName, + messagesToAcquire, + properties.getSystemTaskQueuePopTimeout()); Monitors.recordTaskPoll(queueName); LOGGER.debug("Polling queue:{}, got {} tasks", queueName, polledTaskIds.size()); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java index 9cc17b9e4..565b38211 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java @@ -65,7 +65,19 @@ public boolean execute( */ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {} - public Optional getEvaluationOffset(TaskModel taskModel, long defaultOffset) { + /** + * Determines the time in seconds by which the next execution of a task will be postponed after + * an execution. By default, this method returns {@code Optional.empty()}. + * + *

WorkflowSystemTasks may override this method to define a custom evaluation offset based on + * the task's behavior or requirements. + * + * @param taskModel task model + * @param maxOffset the max recommended offset value to use + * @return an {@code Optional} specifying the evaluation offset in seconds, or {@code + * Optional.empty()} if no postponement is required + */ + public Optional getEvaluationOffset(TaskModel taskModel, long maxOffset) { return Optional.empty(); } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index a9be81d89..c929bcd86 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -128,7 +128,7 @@ public Switch switchTask() { @Bean(TASK_TYPE_JOIN) public Join join() { - return new Join(); + return new Join(new ConductorProperties()); } @Bean @@ -595,7 +595,8 @@ public void testOptionalWithDynamicFork() { assertEquals(TaskModel.Status.SCHEDULED, outcome.tasksToBeScheduled.get(0).getStatus()); System.out.println(outcome.tasksToBeScheduled.get(0)); - new Join().execute(workflow, outcome.tasksToBeScheduled.get(0), null); + new Join(new ConductorProperties()) + .execute(workflow, outcome.tasksToBeScheduled.get(0), null); assertEquals(TaskModel.Status.COMPLETED, outcome.tasksToBeScheduled.get(0).getStatus()); } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java index 66082edd0..b6315d7f3 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestJoin.java @@ -19,6 +19,7 @@ import org.junit.Test; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -27,6 +28,9 @@ import static org.mockito.Mockito.mock; public class TestJoin { + + private final ConductorProperties properties = new ConductorProperties(); + private final WorkflowExecutor executor = mock(WorkflowExecutor.class); private TaskModel createTask( @@ -65,7 +69,7 @@ public void testShouldNotMarkJoinAsCompletedWithErrorsWhenNotDone() { // task2 is not scheduled yet, so the join is not completed var wfJoinPair = createJoinWorkflow(List.of(task1), "task2"); - var join = new Join(); + var join = new Join(properties); var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); assertFalse(result); } @@ -77,7 +81,7 @@ public void testJoinCompletesSuccessfullyWhenAllTasksSucceed() { var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); - var join = new Join(); + var join = new Join(properties); var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); assertTrue("Join task should execute successfully when all tasks succeed", result); assertEquals( @@ -93,7 +97,7 @@ public void testJoinWaitsWhenAnyTaskIsNotTerminal() { var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); - var join = new Join(); + var join = new Join(properties); var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); assertFalse("Join task should wait when any task is not in terminal state", result); } @@ -107,7 +111,7 @@ public void testJoinFailsWhenMandatoryTaskFails() { var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); - var join = new Join(); + var join = new Join(properties); var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); assertTrue("Join task should be executed when a mandatory task fails", result); assertEquals( @@ -125,7 +129,7 @@ public void testJoinCompletesWithErrorsWhenOnlyOptionalTasksFail() { var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); - var join = new Join(); + var join = new Join(properties); var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); assertTrue("Join task should be executed when only optional tasks fail", result); assertEquals( @@ -143,7 +147,7 @@ public void testJoinAggregatesFailureReasonsCorrectly() { var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); - var join = new Join(); + var join = new Join(properties); var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); assertTrue("Join task should be executed when tasks fail", result); assertEquals( @@ -174,7 +178,7 @@ public void testJoinWaitsForAllTasksBeforeFailingDueToPermissiveTaskFailure() { var wfJoinPair = createJoinWorkflow(List.of(task1, task2)); // First execution: Task 2 is not yet terminal. - var join = new Join(); + var join = new Join(properties); boolean result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor); assertFalse("Join task should wait as not all tasks are terminal", result); @@ -189,4 +193,33 @@ public void testJoinWaitsForAllTasksBeforeFailingDueToPermissiveTaskFailure() { TaskModel.Status.FAILED, wfJoinPair.getRight().getStatus()); } + + @Test + public void testEvaluationOffsetWhenPollCountIsBelowThreshold() { + var join = new Join(properties); + var taskModel = createTask("join1", TaskModel.Status.COMPLETED, false, false); + taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() - 1); + var opt = join.getEvaluationOffset(taskModel, 30L); + assertEquals(0L, (long) opt.orElseThrow()); + } + + @Test + public void testEvaluationOffsetWhenPollCountIsAboveThreshold() { + final var maxOffset = 30L; + var join = new Join(properties); + var taskModel = createTask("join1", TaskModel.Status.COMPLETED, false, false); + + taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 1); + var opt = join.getEvaluationOffset(taskModel, maxOffset); + assertEquals(1L, (long) opt.orElseThrow()); + + taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 10); + opt = join.getEvaluationOffset(taskModel, maxOffset); + long expected = (long) Math.pow(Join.EVALUATION_OFFSET_BASE, 10); + assertEquals(expected, (long) opt.orElseThrow()); + + taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 40); + opt = join.getEvaluationOffset(taskModel, maxOffset); + assertEquals(maxOffset, (long) opt.orElseThrow()); + } }