From 92f06bf1bf397d5d92ff9a835d59a3e2e5bb51bf Mon Sep 17 00:00:00 2001 From: Iva Koleva Date: Thu, 14 Dec 2023 10:25:00 +0200 Subject: [PATCH 1/2] Permissive task capability Why: We need idempotent forked tasks, meaning all tasks get executed, but any failures are still detected upon join. Feature request https://github.com/Netflix/conductor/issues/3861 What: Introduced the concept of Permissive tasks. A Permissive task is similar to a Simple task. The difference is, it permits the other tasks to continue - in case a Permissive task failed. Result is: 1. Forked Permissive tasks will let each other be evaluated, until all the forked tasks had terminated. Only then, the join task should fail. In case of Permissive optional tasks, the join will not fail. 2. Permissive sequential tasks will let subsequent tasks continue. While at the end, the workflow will fail in case a permissive task had failed. The workflow would not fail in case of Permissive optional task failure. Testing done: PermissiveTaskMapperTest added, TestDeciderOutcomes.testPermissive() added, WorkflowAndTaskConfigurationSpec "Test simple workflow which has a permissive task" and "Test simple workflow which has a permissive optional task added" that cover retry, ForkJoinSpec "Test a simple workflow with fork join permissive failure flow" added and "Test retrying a failed permissive fork join workflow" added. In addition, performed e2e tests locally running a Conductor instance. Did build a docker image with the code changes made, started it locally, and started a SampleWorker to poll 3 tasks in parallel. Verified e2e scenarios of task_def_permissive, task_def_permissive_optional, task_def_simple.json, task_def_simple_optional.json, each joining on 6 forked tasks, then running simple task 7 after join. --- .../common/metadata/tasks/TaskType.java | 2 + .../core/execution/DeciderService.java | 49 +++- .../core/execution/WorkflowExecutor.java | 22 ++ .../mapper/PermissiveTaskMapper.java | 102 +++++++ .../core/execution/tasks/ExclusiveJoin.java | 16 +- .../conductor/core/execution/tasks/Join.java | 17 +- .../core/execution/TestDeciderOutcomes.java | 74 +++++ .../mapper/PermissiveTaskMapperTest.java | 114 ++++++++ .../workflow/def/tasks/PermissiveTask.java | 47 +++ .../workflow/executor/WorkflowExecutor.java | 1 + .../test/integration/ForkJoinSpec.groovy | 273 +++++++++++++++++- .../WorkflowAndTaskConfigurationSpec.groovy | 159 +++++++++- ...fork_join_permissive_integration_test.json | 109 +++++++ ...ermissive_task_retry_integration_test.json | 190 ++++++++++++ ...issive_optional_task_integration_test.json | 58 ++++ ...with_permissive_task_integration_test.json | 92 ++++++ 16 files changed, 1311 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java create mode 100644 core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java create mode 100644 java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java create mode 100644 test-harness/src/test/resources/fork_join_permissive_integration_test.json create mode 100644 test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json create mode 100644 test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json create mode 100644 test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java index 235a0ac91..efc985acd 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java @@ -23,6 +23,7 @@ public enum TaskType { DYNAMIC, FORK_JOIN, FORK_JOIN_DYNAMIC, + PERMISSIVE, DECISION, SWITCH, JOIN, @@ -70,6 +71,7 @@ public enum TaskType { public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM"; public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE"; public static final String TASK_TYPE_FORK = "FORK"; + public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE"; public static final String TASK_TYPE_NOOP = "NOOP"; private static final Set BUILT_IN_TASKS = new HashSet<>(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index c6bf486ba..52afc52c2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -44,6 +44,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE; import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED; import static com.netflix.conductor.model.TaskModel.Status.*; @@ -207,7 +208,11 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get()); executedTaskRefNames.remove(retryTask.get().getReferenceTaskName()); outcome.tasksToBeUpdated.add(pendingTask); - } else { + } else if (!(pendingTask.getWorkflowTask() != null + && TaskType.PERMISSIVE + .name() + .equals(pendingTask.getWorkflowTask().getType()) + && !pendingTask.getWorkflowTask().isOptional())) { pendingTask.setStatus(COMPLETED_WITH_ERRORS); } } @@ -254,6 +259,39 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS if (hasSuccessfulTerminateTask || (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) { LOGGER.debug("Marking workflow: {} as complete.", workflow); + List permissiveTasksTerminalNonSuccessful = + workflow.getTasks().stream() + .filter(t -> t.getWorkflowTask() != null) + .filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType())) + .filter(t -> !t.getWorkflowTask().isOptional()) + .collect( + Collectors.toMap( + TaskModel::getReferenceTaskName, + t -> t, + (t1, t2) -> + t1.getRetryCount() > t2.getRetryCount() + ? t1 + : t2)) + .values() + .stream() + .filter( + t -> + t.getStatus().isTerminal() + && !t.getStatus().isSuccessful()) + .toList(); + if (!permissiveTasksTerminalNonSuccessful.isEmpty()) { + final String errMsg = + permissiveTasksTerminalNonSuccessful.stream() + .map( + t -> + String.format( + "Task %s failed with status: %s and reason: '%s'", + t.getTaskId(), + t.getStatus(), + t.getReasonForIncompletion())) + .collect(Collectors.joining(". ")); + throw new TerminateWorkflowException(errMsg); + } outcome.isComplete = true; } @@ -437,11 +475,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow) if (status == null || !status.isTerminal()) { return false; } - // if we reach here, the task has been completed. - // Was the task successful in completion? - if (!status.isSuccessful()) { - return false; - } } boolean noPendingSchedule = @@ -529,7 +562,9 @@ Optional retry( if (!task.getStatus().isRetriable() || TaskType.isBuiltIn(task.getTaskType()) || expectedRetryCount <= retryCount) { - if (workflowTask != null && workflowTask.isOptional()) { + if (workflowTask != null + && (workflowTask.isOptional() + || TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) { return Optional.empty(); } WorkflowModel.Status status; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 6070741a2..37bc358bb 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -336,6 +336,18 @@ private void retry(WorkflowModel workflow) { for (TaskModel task : workflow.getTasks()) { switch (task.getStatus()) { case FAILED: + if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString()) + || task.getTaskType() + .equalsIgnoreCase(TaskType.EXCLUSIVE_JOIN.toString())) { + @SuppressWarnings("unchecked") + List joinOn = (List) task.getInputData().get("joinOn"); + boolean joinOnFailedPermissive = isJoinOnFailedPermissive(joinOn, workflow); + if (joinOnFailedPermissive) { + task.setStatus(IN_PROGRESS); + addTaskToQueue(task); + break; + } + } case FAILED_WITH_TERMINAL_ERROR: case TIMED_OUT: retriableMap.put(task.getReferenceTaskName(), task); @@ -1814,4 +1826,14 @@ private void expediteLazyWorkflowEvaluation(String workflowId) { LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE); } + + private static boolean isJoinOnFailedPermissive(List joinOn, WorkflowModel workflow) { + return joinOn.stream() + .map(workflow::getTaskByRefName) + .anyMatch( + t -> + TaskType.PERMISSIVE.name().equals(t.getWorkflowTask().getType()) + && !t.getWorkflowTask().isOptional() + && t.getStatus().equals(FAILED)); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java new file mode 100644 index 000000000..124a9b2e0 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java @@ -0,0 +1,102 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.mapper; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskType; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.utils.ParametersUtils; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; + +/** + * An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link + * TaskType#PERMISSIVE} to a {@link TaskModel} with status {@link TaskModel.Status#SCHEDULED}. + */ +@Component +public class PermissiveTaskMapper implements TaskMapper { + + public static final Logger LOGGER = LoggerFactory.getLogger(PermissiveTaskMapper.class); + private final ParametersUtils parametersUtils; + + public PermissiveTaskMapper(ParametersUtils parametersUtils) { + this.parametersUtils = parametersUtils; + } + + @Override + public String getTaskType() { + return TaskType.PERMISSIVE.name(); + } + + /** + * This method maps a {@link WorkflowTask} of type {@link TaskType#PERMISSIVE} to a {@link + * TaskModel} + * + * @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link + * WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId + * @return a List with just one exclusive task + * @throws TerminateWorkflowException In case if the task definition does not exist + */ + @Override + public List getMappedTasks(TaskMapperContext taskMapperContext) + throws TerminateWorkflowException { + + LOGGER.debug("TaskMapperContext {} in PermissiveTaskMapper", taskMapperContext); + + WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); + WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); + int retryCount = taskMapperContext.getRetryCount(); + String retriedTaskId = taskMapperContext.getRetryTaskId(); + + TaskDef taskDefinition = + Optional.ofNullable(workflowTask.getTaskDefinition()) + .orElseThrow( + () -> { + String reason = + String.format( + "Invalid task. Task %s does not have a definition", + workflowTask.getName()); + return new TerminateWorkflowException(reason); + }); + + Map input = + parametersUtils.getTaskInput( + workflowTask.getInputParameters(), + workflowModel, + taskDefinition, + taskMapperContext.getTaskId()); + TaskModel permissiveTask = taskMapperContext.createTaskModel(); + permissiveTask.setTaskType(workflowTask.getName()); + permissiveTask.setStartDelayInSeconds(workflowTask.getStartDelay()); + permissiveTask.setInputData(input); + permissiveTask.setStatus(TaskModel.Status.SCHEDULED); + permissiveTask.setRetryCount(retryCount); + permissiveTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); + permissiveTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds()); + permissiveTask.setRetriedTaskId(retriedTaskId); + permissiveTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); + permissiveTask.setRateLimitFrequencyInSeconds( + taskDefinition.getRateLimitFrequencyInSeconds()); + return List.of(permissiveTask); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java index e2bf0ac0b..62a02c3c9 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java @@ -24,6 +24,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN; @Component(TASK_TYPE_EXCLUSIVE_JOIN) @@ -65,9 +66,20 @@ public boolean execute( } taskStatus = exclusiveTask.getStatus(); foundExlusiveJoinOnTask = taskStatus.isTerminal(); - hasFailures = !taskStatus.isSuccessful(); + hasFailures = + !taskStatus.isSuccessful() + && (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType()) + || joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t.getStatus().isTerminal())); if (hasFailures) { - failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" "); + final String failureReasons = + joinOn.stream() + .map(workflow::getTaskByRefName) + .filter(t -> !t.getStatus().isSuccessful()) + .map(TaskModel::getReasonForIncompletion) + .collect(Collectors.joining(" ")); + failureReason.append(failureReasons); } break; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index e0f7be9fa..14fdba6f1 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -23,6 +23,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN; @Component(TASK_TYPE_JOIN) @@ -57,9 +58,21 @@ public boolean execute( break; } TaskModel.Status taskStatus = forkedTask.getStatus(); - hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional(); + hasFailures = + !taskStatus.isSuccessful() + && !forkedTask.getWorkflowTask().isOptional() + && (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType()) + || joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t.getStatus().isTerminal())); if (hasFailures) { - failureReason.append(forkedTask.getReasonForIncompletion()).append(" "); + final String failureReasons = + joinOn.stream() + .map(workflow::getTaskByRefName) + .filter(t -> !t.getStatus().isSuccessful()) + .map(TaskModel::getReasonForIncompletion) + .collect(Collectors.joining(" ")); + failureReason.append(failureReasons); } // Only add to task output if it's not empty if (!forkedTask.getOutputData().isEmpty()) { diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index dbf0277f6..ca06c8ea8 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -440,6 +440,80 @@ public void testOptional() { outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); } + /** Similar to {@link #testOptional} */ + @Test + public void testPermissive() { + WorkflowDef def = new WorkflowDef(); + def.setName("test-permissive"); + + WorkflowTask task1 = new WorkflowTask(); + task1.setName("task0"); + task1.setType("PERMISSIVE"); + task1.setTaskReferenceName("t0"); + task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}"); + task1.setTaskDefinition(new TaskDef("task0")); + + WorkflowTask task2 = new WorkflowTask(); + task2.setName("task1"); + task2.setType("PERMISSIVE"); + task2.setTaskReferenceName("t1"); + task2.setTaskDefinition(new TaskDef("task1")); + + def.getTasks().add(task1); + def.getTasks().add(task2); + def.setSchemaVersion(2); + + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(def); + workflow.setCreateTime(System.currentTimeMillis()); + DeciderOutcome outcome = deciderService.decide(workflow); + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeScheduled.size()); + assertEquals( + task1.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + + for (int i = 0; i < 3; i++) { + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); + assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId")); + + workflow.getTasks().clear(); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED); + + outcome = deciderService.decide(workflow); + + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); + + assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus()); + assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); + assertEquals( + task1.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount()); + } + + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); + + workflow.getTasks().clear(); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED); + + outcome = deciderService.decide(workflow); + + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); + + assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus()); + assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); + assertEquals( + task2.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + } + @Test public void testOptionalWithDynamicFork() { WorkflowDef def = new WorkflowDef(); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java b/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java new file mode 100644 index 000000000..97920a3f0 --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.mapper; + +import java.util.HashMap; +import java.util.List; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.utils.IDGenerator; +import com.netflix.conductor.core.utils.ParametersUtils; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.*; + +public class PermissiveTaskMapperTest { + + private PermissiveTaskMapper permissiveTaskMapper; + + private IDGenerator idGenerator = new IDGenerator(); + + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() { + ParametersUtils parametersUtils = mock(ParametersUtils.class); + permissiveTaskMapper = new PermissiveTaskMapper(parametersUtils); + } + + @Test + public void getMappedTasks() { + + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("permissive_task"); + workflowTask.setTaskDefinition(new TaskDef("permissive_task")); + + String taskId = idGenerator.generate(); + String retriedTaskId = idGenerator.generate(); + + WorkflowDef workflowDef = new WorkflowDef(); + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(new HashMap<>()) + .withRetryCount(0) + .withRetryTaskId(retriedTaskId) + .withTaskId(taskId) + .build(); + + List mappedTasks = permissiveTaskMapper.getMappedTasks(taskMapperContext); + assertNotNull(mappedTasks); + assertEquals(1, mappedTasks.size()); + } + + @Test + public void getMappedTasksException() { + + // Given + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("permissive_task"); + String taskId = idGenerator.generate(); + String retriedTaskId = idGenerator.generate(); + + WorkflowDef workflowDef = new WorkflowDef(); + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(new HashMap<>()) + .withRetryCount(0) + .withRetryTaskId(retriedTaskId) + .withTaskId(taskId) + .build(); + + // then + expectedException.expect(TerminateWorkflowException.class); + expectedException.expectMessage( + String.format( + "Invalid task. Task %s does not have a definition", + workflowTask.getName())); + + // when + permissiveTaskMapper.getMappedTasks(taskMapperContext); + } +} diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java new file mode 100644 index 000000000..f767f8ea5 --- /dev/null +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.sdk.workflow.def.tasks; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskType; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; + +/* Workflow permissive task executed by a worker */ +public class PermissiveTask extends Task { + + private TaskDef taskDef; + + public PermissiveTask(String taskDefName, String taskReferenceName) { + super(taskReferenceName, TaskType.PERMISSIVE); + super.name(taskDefName); + } + + PermissiveTask(WorkflowTask workflowTask) { + super(workflowTask); + this.taskDef = workflowTask.getTaskDefinition(); + } + + public TaskDef getTaskDef() { + return taskDef; + } + + public PermissiveTask setTaskDef(TaskDef taskDef) { + this.taskDef = taskDef; + return this; + } + + @Override + protected void updateWorkflowTask(WorkflowTask workflowTask) { + workflowTask.setTaskDefinition(taskDef); + } +} diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index ae75b6c83..6601f1a23 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -72,6 +72,7 @@ public static void initTaskImplementations() { TaskRegistry.register(TaskType.DYNAMIC.name(), Dynamic.class); TaskRegistry.register(TaskType.FORK_JOIN_DYNAMIC.name(), DynamicFork.class); TaskRegistry.register(TaskType.FORK_JOIN.name(), ForkJoin.class); + TaskRegistry.register(TaskType.PERMISSIVE.name(), PermissiveTask.class); TaskRegistry.register(TaskType.HTTP.name(), Http.class); TaskRegistry.register(TaskType.INLINE.name(), Javascript.class); TaskRegistry.register(TaskType.JOIN.name(), Join.class); diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 437f4e1cc..b6540b160 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -16,11 +16,11 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef +import com.netflix.conductor.common.metadata.tasks.TaskType import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow -import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared @@ -45,18 +45,23 @@ class ForkJoinSpec extends AbstractSpecification { @Shared def FORK_JOIN_SUB_WORKFLOW = 'integration_test_fork_join_sw' + @Shared + def FORK_JOIN_PERMISSIVE_WF = 'FanInOutPermissiveTest' + @Autowired SubWorkflow subWorkflowTask def setup() { workflowTestUtil.registerWorkflows('fork_join_integration_test.json', 'fork_join_with_no_task_retry_integration_test.json', + 'fork_join_with_no_permissive_task_retry_integration_test.json', 'nested_fork_join_integration_test.json', 'simple_workflow_1_integration_test.json', 'nested_fork_join_with_sub_workflow_integration_test.json', 'simple_one_task_sub_workflow_integration_test.json', 'fork_join_with_optional_sub_workflow_forks_integration_test.json', - 'fork_join_sub_workflow.json' + 'fork_join_sub_workflow.json', + 'fork_join_permissive_integration_test.json', ) } @@ -251,6 +256,110 @@ class ForkJoinSpec extends AbstractSpecification { metadataService.updateTaskDef(persistedIntegrationTask2Definition) } + /** + * start + * | + * fork + * / \ + * p_task1 p_task2 + * | / + * \ / + * \ / + * join + * | + * s_task3 + * | + * End + */ + def "Test a simple workflow with fork join permissive failure flow"() { + setup: "Ensure that 'integration_task_1' has a retry count of 0" + def persistedIntegrationTask1Definition = workflowTestUtil.getPersistedTaskDefinition('integration_task_1').get() + def modifiedIntegrationTask1Definition = new TaskDef(persistedIntegrationTask1Definition.name, + persistedIntegrationTask1Definition.description, persistedIntegrationTask1Definition.ownerEmail, 0, + 0, persistedIntegrationTask1Definition.responseTimeoutSeconds) + metadataService.updateTaskDef(modifiedIntegrationTask1Definition) + + when: "A fork join workflow is started" + def workflowInstanceId = startWorkflow(FORK_JOIN_PERMISSIVE_WF, 1, + 'fanoutTest', [:], + null) + + then: "verify that the workflow has started and the starting nodes of the each fork are in scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].workflowTask.type == TaskType.PERMISSIVE.name() + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'integration_task_1' + tasks[2].workflowTask.type == TaskType.PERMISSIVE.name() + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() + def polledAndAckTask1Try1 = workflowTestUtil.pollAndFailTask('integration_task_1', 'task1.worker', 'Failed...') + + then: "verify that the 'integration_task_1' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask1Try1) + + and: "The workflow has been updated and has all the required tasks in the right status to move forward" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The other node of the fork is completed by completing 'integration_task_2'" + def polledAndAckTask2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2','task1.worker') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_task_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) + + and: "the workflow is in the failed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "JOIN task executed by the async executor" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "The workflow has been updated with the task status and task list" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.FAILED + tasks[3].taskType == 'JOIN' + } + + cleanup: "Restore the task definitions that were modified as part of this feature testing" + metadataService.updateTaskDef(persistedIntegrationTask1Definition) + } + def "Test retrying a failed fork join workflow"() { when: "A fork join workflow is started" @@ -384,6 +493,166 @@ class ForkJoinSpec extends AbstractSpecification { } } + def "Test retrying a failed permissive fork join workflow"() { + + when: "A fork join permissive workflow is started" + def workflowInstanceId = startWorkflow(FORK_JOIN_PERMISSIVE_WF + '_2', 1, + 'fanoutTest', [:], + null) + + then: "verify that the workflow has started and the starting nodes of the each fork are in scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() + def polledAndAckTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_1', 'task1.worker') + + then: "verify that the 'integration_task_p_0_RT_1' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask1Try1) + + and: "The workflow has been updated and has all the required tasks in the right status to move forward" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.SCHEDULED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_2'" + def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_p_task_0_RT_2', + 'task1.worker', 'Failed....') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_0_RT_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) + + and: "the workflow is not in the failed state, until the completion of the permissive forked tasks" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.SCHEDULED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_3'" + def polledAndAckTask3Try1 = workflowTestUtil.pollAndFailTask('integration_p_task_0_RT_3', + 'task1.worker', 'Failed....') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_0_RT_3' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try1) + + and: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + and: "the workflow is in the failed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.FAILED + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The workflow is retried" + workflowExecutor.retry(workflowInstanceId, false) + + then: "verify that all the workflow is retried and new tasks are added in place of the failed tasks" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 7 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + tasks[5].status == Task.Status.SCHEDULED + tasks[5].taskType == 'integration_p_task_0_RT_2' + tasks[6].status == Task.Status.SCHEDULED + tasks[6].taskType == 'integration_p_task_0_RT_3' + } + + when: "The 'integration_p_task_0_RT_3' is polled and completed" + def polledAndAckTask3Try2 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_3', 'task1.worker') + + then: "verify that the 'integration_p_task_3' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try2) + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_2'" + def polledAndAckTask2Try2 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_2', 'task1.worker') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try2) + + when: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + and: "The last task of the workflow is then polled and completed integration_p_task_0_RT_4'" + def polledAndAckTask4Try1 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_4', 'task1.worker') + + then: "verify that the 'integration_p_task_0_RT_4' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask4Try1) + + then: "Then verify that the workflow is completed and the task list of execution is as expected" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 8 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.COMPLETED + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + tasks[5].status == Task.Status.COMPLETED + tasks[5].taskType == 'integration_p_task_0_RT_2' + tasks[6].status == Task.Status.COMPLETED + tasks[6].taskType == 'integration_p_task_0_RT_3' + tasks[7].status == Task.Status.COMPLETED + tasks[7].taskType == 'integration_p_task_0_RT_4' + } + } + def "Test nested fork join workflow success flow"() { given: "Input for the nested fork join workflow" Map input = new HashMap() diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy index 0fbf0ec0b..53d5312df 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy @@ -45,6 +45,12 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { @Shared def WORKFLOW_WITH_OPTIONAL_TASK = 'optional_task_wf' + @Shared + def WORKFLOW_WITH_PERMISSIVE_TASK = 'permissive_task_wf' + + @Shared + def WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK = 'permissive_optional_task_wf' + @Shared def TEST_WORKFLOW = 'integration_test_wf3' @@ -52,12 +58,14 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { def WAIT_TIME_OUT_WORKFLOW = 'test_wait_timeout' def setup() { - //Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK + //Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK, WORKFLOW_WITH_PERMISSIVE_TASK, WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK workflowTestUtil.registerWorkflows( 'simple_workflow_1_integration_test.json', 'simple_workflow_1_input_template_integration_test.json', 'simple_workflow_3_integration_test.json', 'simple_workflow_with_optional_task_integration_test.json', + 'simple_workflow_with_permissive_task_integration_test.json', + 'simple_workflow_with_permissive_optional_task_integration_test.json', 'simple_wait_task_workflow_integration_test.json') } @@ -133,6 +141,155 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { } } + def "Test simple workflow which has a permissive task"() { + + given: "A input parameters for a workflow with a permissive task" + def correlationId = 'integration_test' + UUID.randomUUID().toString() + def workflowInput = new HashMap() + workflowInput['param1'] = 'p1 value' + workflowInput['param2'] = 'p2 value' + + when: "A permissive task workflow is started" + def workflowInstanceId = startWorkflow(WORKFLOW_WITH_PERMISSIVE_TASK, 1, + correlationId, workflowInput, + null) + + then: "verify that the workflow has started and the permissive task is in a scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'task_permissive' + } + + when: "The first permissive task is polled and failed" + Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_permissive', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_permissive was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1) + + when: "A decide is executed on the workflow" + workflowExecutor.decide(workflowInstanceId) + + then: "verify that the workflow is still running and the first permissive task has failed and the retry has kicked in" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.FAILED + tasks[0].taskType == 'task_permissive' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'task_permissive' + } + + when: "The first permissive task is polled and failed" + Tuple polledAndFailedTaskTry2 = workflowTestUtil.pollAndFailTask('task_permissive', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_permissive was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry2) + + workflowExecutor.decide(workflowInstanceId) + + then: "Ensure that the workflow is updated" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 3 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'task_permissive' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + } + + when: "The second task 'integration_task_2' is polled and completed" + def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker') + + then: "Verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(task2Try1) + + and: "Ensure that the workflow is in completed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + reasonForIncompletion == "Task ${tasks[1].taskId} failed with status: FAILED and reason: 'NETWORK ERROR'" + tasks.size() == 3 + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + } + } + + def "Test simple workflow which has a permissive optional task"() { + + given: "A input parameters for a workflow with a permissive optional task" + def correlationId = 'integration_test' + UUID.randomUUID().toString() + def workflowInput = new HashMap() + workflowInput['param1'] = 'p1 value' + workflowInput['param2'] = 'p2 value' + + when: "A permissive optional task workflow is started" + def workflowInstanceId = startWorkflow(WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK, 1, + correlationId, workflowInput, + null) + + then: "verify that the workflow has started and the permissive optional task is in a scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'task_optional' + } + + when: "The first permissive optional task is polled and failed" + Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_optional', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_optional was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1) + + when: "A decide is executed on the workflow" + workflowExecutor.decide(workflowInstanceId) + + then: "verify that the workflow is still running and the first permissive optional task has failed and the retry has kicked in" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.FAILED + tasks[0].taskType == 'task_optional' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'task_optional' + } + + when: "Poll the permissive optional task again and do not complete it and run decide" + workflowExecutionService.poll('task_optional', 'task1.integration.worker') + Thread.sleep(5000) + workflowExecutor.decide(workflowInstanceId) + + then: "Ensure that the workflow is updated" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 3 + tasks[1].status == Task.Status.COMPLETED_WITH_ERRORS + tasks[1].taskType == 'task_optional' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + } + + when: "The second task 'integration_task_2' is polled and completed" + def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker') + + then: "Verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(task2Try1) + + and: "Ensure that the workflow is in completed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 3 + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + } + } + def "test workflow with input template parsing"() { given: "Input parameters for a workflow with input template" def correlationId = 'integration_test' + UUID.randomUUID().toString() diff --git a/test-harness/src/test/resources/fork_join_permissive_integration_test.json b/test-harness/src/test/resources/fork_join_permissive_integration_test.json new file mode 100644 index 000000000..6f65d4e11 --- /dev/null +++ b/test-harness/src/test/resources/fork_join_permissive_integration_test.json @@ -0,0 +1,109 @@ +{ + "name": "FanInOutPermissiveTest", + "description": "FanInOutPermissiveTest", + "version": 1, + "tasks": [ + { + "name": "fork", + "taskReferenceName": "fanouttask", + "inputParameters": {}, + "type": "FORK_JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [ + [ + { + "name": "integration_task_1", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + [ + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ] + ], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "join", + "taskReferenceName": "fanouttask_join", + "inputParameters": {}, + "type": "JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [ + "t1", + "t2" + ], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_3", + "taskReferenceName": "t3", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": {}, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json b/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json new file mode 100644 index 000000000..8c1ebef77 --- /dev/null +++ b/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json @@ -0,0 +1,190 @@ +{ + "name": "FanInOutPermissiveTest_2", + "description": "FanInOutPermissiveTest_2", + "version": 1, + "tasks": [ + { + "name": "fork", + "taskReferenceName": "fanouttask", + "inputParameters": {}, + "type": "FORK_JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [ + [ + { + "name": "integration_p_task_0_RT_1", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_1", + "description": "integration_p_task_0_RT_1", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + }, + { + "name": "integration_p_task_0_RT_3", + "taskReferenceName": "t3", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_3", + "description": "integration_p_task_0_RT_3", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ], + [ + { + "name": "integration_p_task_0_RT_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_2", + "description": "integration_p_task_0_RT_2", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ] + ], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "join", + "taskReferenceName": "fanouttask_join", + "inputParameters": {}, + "type": "JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [ + "t3", + "t2" + ], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_p_task_0_RT_4", + "taskReferenceName": "t4", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_4", + "description": "integration_p_task_0_RT_4", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": {}, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json new file mode 100644 index 000000000..84c6910ab --- /dev/null +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json @@ -0,0 +1,58 @@ +{ + "name": "permissive_optional_task_wf", + "description": "permissive_optional_task_wf", + "version": 1, + "tasks": [ + { + "name": "task_optional", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "${workflow.input.param1}", + "p2": "${workflow.input.param2}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": { + "o1": "${workflow.input.param1}", + "o2": "${t2.output.uuid}", + "o3": "${t1.output.op}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json new file mode 100644 index 000000000..2b8f4dbfe --- /dev/null +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json @@ -0,0 +1,92 @@ +{ + "name": "permissive_task_wf", + "description": "permissive_task_wf", + "version": 1, + "tasks": [ + { + "name": "task_permissive", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "${workflow.input.param1}", + "p2": "${workflow.input.param2}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "retryCount": 1, + "taskDefinition": { + "createdBy": "integration_app", + "name": "task_permissive", + "description": "task_permissive", + "retryCount": 1, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + }, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "retryCount": 1, + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_task_2", + "description": "integration_task_2", + "retryCount": 1, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + }, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": { + "o1": "${workflow.input.param1}", + "o2": "${t2.output.uuid}", + "o3": "${t1.output.op}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file From fdf5f5e73839e196b472dd2d15a178e6dea29fc9 Mon Sep 17 00:00:00 2001 From: Iva Koleva Date: Tue, 19 Dec 2023 21:38:19 +0200 Subject: [PATCH 2/2] WorkflowTask permissive property added, so various task types can be permissive. --- .../common/metadata/tasks/TaskType.java | 2 - .../metadata/workflow/WorkflowTask.java | 19 +++ .../core/execution/DeciderService.java | 10 +- .../core/execution/WorkflowExecutor.java | 2 +- .../mapper/PermissiveTaskMapper.java | 102 ---------------- .../core/execution/tasks/ExclusiveJoin.java | 3 +- .../conductor/core/execution/tasks/Join.java | 3 +- .../core/execution/TestDeciderOutcomes.java | 5 +- .../mapper/PermissiveTaskMapperTest.java | 114 ------------------ .../conductor/grpc/AbstractProtoMapper.java | 2 + grpc/src/main/proto/model/workflowtask.proto | 1 + .../workflow/def/tasks/PermissiveTask.java | 47 -------- .../workflow/executor/WorkflowExecutor.java | 1 - .../test/integration/ForkJoinSpec.groovy | 4 +- ...fork_join_permissive_integration_test.json | 6 +- ...ermissive_task_retry_integration_test.json | 12 +- ...issive_optional_task_integration_test.json | 6 +- ...with_permissive_task_integration_test.json | 6 +- 18 files changed, 52 insertions(+), 293 deletions(-) delete mode 100644 core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java delete mode 100644 core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java delete mode 100644 java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java index efc985acd..235a0ac91 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java @@ -23,7 +23,6 @@ public enum TaskType { DYNAMIC, FORK_JOIN, FORK_JOIN_DYNAMIC, - PERMISSIVE, DECISION, SWITCH, JOIN, @@ -71,7 +70,6 @@ public enum TaskType { public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM"; public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE"; public static final String TASK_TYPE_FORK = "FORK"; - public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE"; public static final String TASK_TYPE_NOOP = "NOOP"; private static final Set BUILT_IN_TASKS = new HashSet<>(); diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java index 492a61d33..43b3c8af2 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java @@ -153,6 +153,9 @@ public void setTasks(List tasks) { @ProtoField(id = 28) private String expression; + @ProtoField(id = 29) + private boolean permissive = false; + /** * @return the name */ @@ -548,6 +551,22 @@ public void setExpression(String expression) { this.expression = expression; } + /** + * @return If the task is permissive. When set to true, and the task is in failed status, + * fail-fast does not occur. The workflow execution continues until reaching join or end of + * workflow, allowing idempotent execution of other tasks. + */ + public boolean isPermissive() { + return this.permissive; + } + + /** + * @param permissive when set to true, the task is marked as permissive + */ + public void setPermissive(boolean permissive) { + this.permissive = permissive; + } + private Collection> children() { Collection> workflowTaskLists = new LinkedList<>(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index 52afc52c2..127440d60 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -44,7 +44,6 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; -import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE; import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED; import static com.netflix.conductor.model.TaskModel.Status.*; @@ -209,9 +208,7 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS executedTaskRefNames.remove(retryTask.get().getReferenceTaskName()); outcome.tasksToBeUpdated.add(pendingTask); } else if (!(pendingTask.getWorkflowTask() != null - && TaskType.PERMISSIVE - .name() - .equals(pendingTask.getWorkflowTask().getType()) + && pendingTask.getWorkflowTask().isPermissive() && !pendingTask.getWorkflowTask().isOptional())) { pendingTask.setStatus(COMPLETED_WITH_ERRORS); } @@ -262,7 +259,7 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS List permissiveTasksTerminalNonSuccessful = workflow.getTasks().stream() .filter(t -> t.getWorkflowTask() != null) - .filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType())) + .filter(t -> t.getWorkflowTask().isPermissive()) .filter(t -> !t.getWorkflowTask().isOptional()) .collect( Collectors.toMap( @@ -563,8 +560,7 @@ Optional retry( || TaskType.isBuiltIn(task.getTaskType()) || expectedRetryCount <= retryCount) { if (workflowTask != null - && (workflowTask.isOptional() - || TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) { + && (workflowTask.isOptional() || workflowTask.isPermissive())) { return Optional.empty(); } WorkflowModel.Status status; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 37bc358bb..8cb13f1b6 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1832,7 +1832,7 @@ private static boolean isJoinOnFailedPermissive(List joinOn, WorkflowMod .map(workflow::getTaskByRefName) .anyMatch( t -> - TaskType.PERMISSIVE.name().equals(t.getWorkflowTask().getType()) + t.getWorkflowTask().isPermissive() && !t.getWorkflowTask().isOptional() && t.getStatus().equals(FAILED)); } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java deleted file mode 100644 index 124a9b2e0..000000000 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2023 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package com.netflix.conductor.core.execution.mapper; - -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import com.netflix.conductor.common.metadata.tasks.TaskDef; -import com.netflix.conductor.common.metadata.tasks.TaskType; -import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.netflix.conductor.common.metadata.workflow.WorkflowTask; -import com.netflix.conductor.core.exception.TerminateWorkflowException; -import com.netflix.conductor.core.utils.ParametersUtils; -import com.netflix.conductor.model.TaskModel; -import com.netflix.conductor.model.WorkflowModel; - -/** - * An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link - * TaskType#PERMISSIVE} to a {@link TaskModel} with status {@link TaskModel.Status#SCHEDULED}. - */ -@Component -public class PermissiveTaskMapper implements TaskMapper { - - public static final Logger LOGGER = LoggerFactory.getLogger(PermissiveTaskMapper.class); - private final ParametersUtils parametersUtils; - - public PermissiveTaskMapper(ParametersUtils parametersUtils) { - this.parametersUtils = parametersUtils; - } - - @Override - public String getTaskType() { - return TaskType.PERMISSIVE.name(); - } - - /** - * This method maps a {@link WorkflowTask} of type {@link TaskType#PERMISSIVE} to a {@link - * TaskModel} - * - * @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link - * WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId - * @return a List with just one exclusive task - * @throws TerminateWorkflowException In case if the task definition does not exist - */ - @Override - public List getMappedTasks(TaskMapperContext taskMapperContext) - throws TerminateWorkflowException { - - LOGGER.debug("TaskMapperContext {} in PermissiveTaskMapper", taskMapperContext); - - WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); - WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); - int retryCount = taskMapperContext.getRetryCount(); - String retriedTaskId = taskMapperContext.getRetryTaskId(); - - TaskDef taskDefinition = - Optional.ofNullable(workflowTask.getTaskDefinition()) - .orElseThrow( - () -> { - String reason = - String.format( - "Invalid task. Task %s does not have a definition", - workflowTask.getName()); - return new TerminateWorkflowException(reason); - }); - - Map input = - parametersUtils.getTaskInput( - workflowTask.getInputParameters(), - workflowModel, - taskDefinition, - taskMapperContext.getTaskId()); - TaskModel permissiveTask = taskMapperContext.createTaskModel(); - permissiveTask.setTaskType(workflowTask.getName()); - permissiveTask.setStartDelayInSeconds(workflowTask.getStartDelay()); - permissiveTask.setInputData(input); - permissiveTask.setStatus(TaskModel.Status.SCHEDULED); - permissiveTask.setRetryCount(retryCount); - permissiveTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); - permissiveTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds()); - permissiveTask.setRetriedTaskId(retriedTaskId); - permissiveTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); - permissiveTask.setRateLimitFrequencyInSeconds( - taskDefinition.getRateLimitFrequencyInSeconds()); - return List.of(permissiveTask); - } -} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java index 62a02c3c9..7484a0644 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java @@ -24,7 +24,6 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; -import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN; @Component(TASK_TYPE_EXCLUSIVE_JOIN) @@ -68,7 +67,7 @@ public boolean execute( foundExlusiveJoinOnTask = taskStatus.isTerminal(); hasFailures = !taskStatus.isSuccessful() - && (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType()) + && (!exclusiveTask.getWorkflowTask().isPermissive() || joinOn.stream() .map(workflow::getTaskByRefName) .allMatch(t -> t.getStatus().isTerminal())); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index 14fdba6f1..e1646cf04 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -23,7 +23,6 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; -import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN; @Component(TASK_TYPE_JOIN) @@ -61,7 +60,7 @@ public boolean execute( hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional() - && (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType()) + && (!forkedTask.getWorkflowTask().isPermissive() || joinOn.stream() .map(workflow::getTaskByRefName) .allMatch(t -> t.getStatus().isTerminal())); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index ca06c8ea8..8362fded1 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -440,7 +440,6 @@ public void testOptional() { outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); } - /** Similar to {@link #testOptional} */ @Test public void testPermissive() { WorkflowDef def = new WorkflowDef(); @@ -448,14 +447,14 @@ public void testPermissive() { WorkflowTask task1 = new WorkflowTask(); task1.setName("task0"); - task1.setType("PERMISSIVE"); + task1.setPermissive(true); task1.setTaskReferenceName("t0"); task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}"); task1.setTaskDefinition(new TaskDef("task0")); WorkflowTask task2 = new WorkflowTask(); task2.setName("task1"); - task2.setType("PERMISSIVE"); + task2.setPermissive(true); task2.setTaskReferenceName("t1"); task2.setTaskDefinition(new TaskDef("task1")); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java b/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java deleted file mode 100644 index 97920a3f0..000000000 --- a/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2023 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package com.netflix.conductor.core.execution.mapper; - -import java.util.HashMap; -import java.util.List; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import com.netflix.conductor.common.metadata.tasks.TaskDef; -import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.netflix.conductor.common.metadata.workflow.WorkflowTask; -import com.netflix.conductor.core.exception.TerminateWorkflowException; -import com.netflix.conductor.core.utils.IDGenerator; -import com.netflix.conductor.core.utils.ParametersUtils; -import com.netflix.conductor.model.TaskModel; -import com.netflix.conductor.model.WorkflowModel; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.*; - -public class PermissiveTaskMapperTest { - - private PermissiveTaskMapper permissiveTaskMapper; - - private IDGenerator idGenerator = new IDGenerator(); - - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @Before - public void setUp() { - ParametersUtils parametersUtils = mock(ParametersUtils.class); - permissiveTaskMapper = new PermissiveTaskMapper(parametersUtils); - } - - @Test - public void getMappedTasks() { - - WorkflowTask workflowTask = new WorkflowTask(); - workflowTask.setName("permissive_task"); - workflowTask.setTaskDefinition(new TaskDef("permissive_task")); - - String taskId = idGenerator.generate(); - String retriedTaskId = idGenerator.generate(); - - WorkflowDef workflowDef = new WorkflowDef(); - WorkflowModel workflow = new WorkflowModel(); - workflow.setWorkflowDefinition(workflowDef); - - TaskMapperContext taskMapperContext = - TaskMapperContext.newBuilder() - .withWorkflowModel(workflow) - .withTaskDefinition(new TaskDef()) - .withWorkflowTask(workflowTask) - .withTaskInput(new HashMap<>()) - .withRetryCount(0) - .withRetryTaskId(retriedTaskId) - .withTaskId(taskId) - .build(); - - List mappedTasks = permissiveTaskMapper.getMappedTasks(taskMapperContext); - assertNotNull(mappedTasks); - assertEquals(1, mappedTasks.size()); - } - - @Test - public void getMappedTasksException() { - - // Given - WorkflowTask workflowTask = new WorkflowTask(); - workflowTask.setName("permissive_task"); - String taskId = idGenerator.generate(); - String retriedTaskId = idGenerator.generate(); - - WorkflowDef workflowDef = new WorkflowDef(); - WorkflowModel workflow = new WorkflowModel(); - workflow.setWorkflowDefinition(workflowDef); - - TaskMapperContext taskMapperContext = - TaskMapperContext.newBuilder() - .withWorkflowModel(workflow) - .withTaskDefinition(new TaskDef()) - .withWorkflowTask(workflowTask) - .withTaskInput(new HashMap<>()) - .withRetryCount(0) - .withRetryTaskId(retriedTaskId) - .withTaskId(taskId) - .build(); - - // then - expectedException.expect(TerminateWorkflowException.class); - expectedException.expectMessage( - String.format( - "Invalid task. Task %s does not have a definition", - workflowTask.getName())); - - // when - permissiveTaskMapper.getMappedTasks(taskMapperContext); - } -} diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index c9cd06e38..113059aac 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -1351,6 +1351,7 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) { if (from.getExpression() != null) { to.setExpression( from.getExpression() ); } + to.setPermissive( from.isPermissive() ); return to.build(); } @@ -1396,6 +1397,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) { to.setRetryCount( from.getRetryCount() ); to.setEvaluatorType( from.getEvaluatorType() ); to.setExpression( from.getExpression() ); + to.setPermissive( from.getPermissive() ); return to; } diff --git a/grpc/src/main/proto/model/workflowtask.proto b/grpc/src/main/proto/model/workflowtask.proto index 8855a714f..2c35d56dd 100644 --- a/grpc/src/main/proto/model/workflowtask.proto +++ b/grpc/src/main/proto/model/workflowtask.proto @@ -41,4 +41,5 @@ message WorkflowTask { int32 retry_count = 26; string evaluator_type = 27; string expression = 28; + bool permissive = 29; } diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java deleted file mode 100644 index f767f8ea5..000000000 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2023 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package com.netflix.conductor.sdk.workflow.def.tasks; - -import com.netflix.conductor.common.metadata.tasks.TaskDef; -import com.netflix.conductor.common.metadata.tasks.TaskType; -import com.netflix.conductor.common.metadata.workflow.WorkflowTask; - -/* Workflow permissive task executed by a worker */ -public class PermissiveTask extends Task { - - private TaskDef taskDef; - - public PermissiveTask(String taskDefName, String taskReferenceName) { - super(taskReferenceName, TaskType.PERMISSIVE); - super.name(taskDefName); - } - - PermissiveTask(WorkflowTask workflowTask) { - super(workflowTask); - this.taskDef = workflowTask.getTaskDefinition(); - } - - public TaskDef getTaskDef() { - return taskDef; - } - - public PermissiveTask setTaskDef(TaskDef taskDef) { - this.taskDef = taskDef; - return this; - } - - @Override - protected void updateWorkflowTask(WorkflowTask workflowTask) { - workflowTask.setTaskDefinition(taskDef); - } -} diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index 6601f1a23..ae75b6c83 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -72,7 +72,6 @@ public static void initTaskImplementations() { TaskRegistry.register(TaskType.DYNAMIC.name(), Dynamic.class); TaskRegistry.register(TaskType.FORK_JOIN_DYNAMIC.name(), DynamicFork.class); TaskRegistry.register(TaskType.FORK_JOIN.name(), ForkJoin.class); - TaskRegistry.register(TaskType.PERMISSIVE.name(), PermissiveTask.class); TaskRegistry.register(TaskType.HTTP.name(), Http.class); TaskRegistry.register(TaskType.INLINE.name(), Javascript.class); TaskRegistry.register(TaskType.JOIN.name(), Join.class); diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index b6540b160..65cce3265 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -291,10 +291,10 @@ class ForkJoinSpec extends AbstractSpecification { tasks.size() == 4 tasks[0].status == Task.Status.COMPLETED tasks[0].taskType == 'FORK' - tasks[1].workflowTask.type == TaskType.PERMISSIVE.name() + tasks[1].workflowTask.permissive tasks[1].status == Task.Status.SCHEDULED tasks[1].taskType == 'integration_task_1' - tasks[2].workflowTask.type == TaskType.PERMISSIVE.name() + tasks[2].workflowTask.permissive tasks[2].status == Task.Status.SCHEDULED tasks[2].taskType == 'integration_task_2' tasks[3].status == Task.Status.IN_PROGRESS diff --git a/test-harness/src/test/resources/fork_join_permissive_integration_test.json b/test-harness/src/test/resources/fork_join_permissive_integration_test.json index 6f65d4e11..771230d20 100644 --- a/test-harness/src/test/resources/fork_join_permissive_integration_test.json +++ b/test-harness/src/test/resources/fork_join_permissive_integration_test.json @@ -19,13 +19,14 @@ "p1": "workflow.input.param1", "p2": "workflow.input.param2" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] @@ -38,13 +39,14 @@ "inputParameters": { "tp1": "workflow.input.param1" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] diff --git a/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json b/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json index 8c1ebef77..ead2a678c 100644 --- a/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json +++ b/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json @@ -19,13 +19,14 @@ "p1": "workflow.input.param1", "p2": "workflow.input.param2" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [], @@ -53,13 +54,14 @@ "p1": "workflow.input.param1", "p2": "workflow.input.param2" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [], @@ -88,13 +90,14 @@ "inputParameters": { "tp1": "workflow.input.param1" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [], @@ -148,13 +151,14 @@ "inputParameters": { "tp1": "workflow.input.param1" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [], diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json index 84c6910ab..ac68e097c 100644 --- a/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json @@ -10,13 +10,14 @@ "p1": "${workflow.input.param1}", "p2": "${workflow.input.param2}" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": true, + "permissive": true, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] @@ -28,13 +29,14 @@ "tp1": "${workflow.input.param1}", "tp2": "${t1.output.op}" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json index 2b8f4dbfe..9893d3a26 100644 --- a/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json @@ -10,13 +10,14 @@ "p1": "${workflow.input.param1}", "p2": "${workflow.input.param2}" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "retryCount": 1, "taskDefinition": { "createdBy": "integration_app", @@ -45,13 +46,14 @@ "tp1": "${workflow.input.param1}", "tp2": "${t1.output.op}" }, - "type": "PERMISSIVE", + "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, + "permissive": true, "retryCount": 1, "taskDefinition": { "createdBy": "integration_app",