diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java index e0acd5b9f..8eed7a00d 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java @@ -31,9 +31,11 @@ import com.netflix.conductor.annotations.VisibleForTesting; import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.common.metadata.workflow.DynamicForkJoinTaskList; +import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.core.utils.ParametersUtils; import com.netflix.conductor.dao.MetadataDAO; @@ -43,6 +45,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import static com.netflix.conductor.common.metadata.tasks.TaskType.SUB_WORKFLOW; +import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SIMPLE; + /** * An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link * TaskType#FORK_JOIN_DYNAMIC} to a LinkedList of {@link TaskModel} beginning with a {@link @@ -58,6 +63,8 @@ public class ForkJoinDynamicTaskMapper implements TaskMapper { private final ParametersUtils parametersUtils; private final ObjectMapper objectMapper; private final MetadataDAO metadataDAO; + private final SystemTaskRegistry systemTaskRegistry; + private static final TypeReference> ListOfWorkflowTasks = new TypeReference<>() {}; @@ -66,11 +73,13 @@ public ForkJoinDynamicTaskMapper( IDGenerator idGenerator, ParametersUtils parametersUtils, ObjectMapper objectMapper, - MetadataDAO metadataDAO) { + MetadataDAO metadataDAO, + SystemTaskRegistry systemTaskRegistry) { this.idGenerator = idGenerator; this.parametersUtils = parametersUtils; this.objectMapper = objectMapper; this.metadataDAO = metadataDAO; + this.systemTaskRegistry = systemTaskRegistry; } @Override @@ -126,92 +135,123 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); int retryCount = taskMapperContext.getRetryCount(); + Map input = + parametersUtils.getTaskInput( + workflowTask.getInputParameters(), workflowModel, null, null); List mappedTasks = new LinkedList<>(); // Get the list of dynamic tasks and the input for the tasks + Pair, Map>> workflowTasksAndInputPair = - Optional.ofNullable(workflowTask.getDynamicForkTasksParam()) - .map( - dynamicForkTaskParam -> - getDynamicForkTasksAndInput( - workflowTask, workflowModel, dynamicForkTaskParam)) - .orElseGet( - () -> getDynamicForkJoinTasksAndInput(workflowTask, workflowModel)); + getDynamicTasksSimple(workflowTask, input); + + if (workflowTasksAndInputPair == null) { + workflowTasksAndInputPair = + Optional.ofNullable(workflowTask.getDynamicForkTasksParam()) + .map( + dynamicForkTaskParam -> + getDynamicForkTasksAndInput( + workflowTask, + workflowModel, + dynamicForkTaskParam, + input)) + .orElseGet( + () -> + getDynamicForkJoinTasksAndInput( + workflowTask, workflowModel, input)); + } List dynForkTasks = workflowTasksAndInputPair.getLeft(); Map> tasksInput = workflowTasksAndInputPair.getRight(); // Create Fork Task which needs to be followed by the dynamic tasks TaskModel forkDynamicTask = createDynamicForkTask(taskMapperContext, dynForkTasks); + forkDynamicTask.getInputData().putAll(taskMapperContext.getTaskInput()); mappedTasks.add(forkDynamicTask); + Optional exists = + workflowModel.getTasks().stream() + .filter( + task -> + task.getReferenceTaskName() + .equals( + taskMapperContext + .getWorkflowTask() + .getTaskReferenceName())) + .findAny(); List joinOnTaskRefs = new LinkedList<>(); // Add each dynamic task to the mapped tasks and also get the last dynamic task in the list, // which indicates that the following task after that needs to be a join task - for (WorkflowTask dynForkTask : - dynForkTasks) { // TODO this is a cyclic dependency, break it out using function - // composition - List forkedTasks = - taskMapperContext - .getDeciderService() - .getTasksToBeScheduled(workflowModel, dynForkTask, retryCount); - - // It's an error state if no forkedTasks can be decided upon. In the cases where we've - // seen - // this happen is when a dynamic task is attempting to be created here, but a task with - // the - // same reference name has already been created in the Workflow. - if (forkedTasks == null || forkedTasks.isEmpty()) { - Optional existingTaskRefName = - workflowModel.getTasks().stream() - .filter( - runningTask -> - runningTask - .getStatus() - .equals( - TaskModel.Status - .IN_PROGRESS) - || runningTask.getStatus().isTerminal()) - .map(TaskModel::getReferenceTaskName) - .filter( - refTaskName -> - refTaskName.equals( - dynForkTask.getTaskReferenceName())) - .findAny(); - - // Construct an informative error message - String terminateMessage = - "No dynamic tasks could be created for the Workflow: " - + workflowModel.toShortString() - + ", Dynamic Fork Task: " - + dynForkTask; - if (existingTaskRefName.isPresent()) { - terminateMessage += - "Attempted to create a duplicate task reference name: " - + existingTaskRefName.get(); + if (!exists.isPresent()) { + // Add each dynamic task to the mapped tasks and also get the last dynamic task in the + // list, + // which indicates that the following task after that needs to be a join task + for (WorkflowTask dynForkTask : dynForkTasks) { + // composition + + List forkedTasks = + taskMapperContext + .getDeciderService() + .getTasksToBeScheduled(workflowModel, dynForkTask, retryCount); + if (forkedTasks == null || forkedTasks.isEmpty()) { + Optional existingTaskRefName = + workflowModel.getTasks().stream() + .filter( + runningTask -> + runningTask + .getStatus() + .equals( + TaskModel.Status + .IN_PROGRESS) + || runningTask.getStatus().isTerminal()) + .map(TaskModel::getReferenceTaskName) + .filter( + refTaskName -> + refTaskName.equals( + dynForkTask.getTaskReferenceName())) + .findAny(); + + // Construct an informative error message + String terminateMessage = + "No dynamic tasks could be created for the Workflow: " + + workflowModel.toShortString() + + ", Dynamic Fork Task: " + + dynForkTask; + if (existingTaskRefName.isPresent()) { + terminateMessage += + " attempted to create a duplicate task reference name: " + + existingTaskRefName.get(); + } + throw new TerminateWorkflowException(terminateMessage); } - throw new TerminateWorkflowException(terminateMessage); - } - for (TaskModel forkedTask : forkedTasks) { - try { - Map forkedTaskInput = - tasksInput.get(forkedTask.getReferenceTaskName()); - forkedTask.addInput(forkedTaskInput); - } catch (Exception e) { - String reason = - String.format( - "Tasks could not be dynamically forked due to invalid input: %s", - e.getMessage()); - throw new TerminateWorkflowException(reason); + for (TaskModel forkedTask : forkedTasks) { + try { + Map forkedTaskInput = + tasksInput.get(forkedTask.getReferenceTaskName()); + if (forkedTask.getInputData() == null) { + forkedTask.setInputData(new HashMap<>()); + } + if (forkedTaskInput == null) { + forkedTaskInput = new HashMap<>(); + } + forkedTask.getInputData().putAll(forkedTaskInput); + } catch (Exception e) { + String reason = + String.format( + "Tasks could not be dynamically forked due to invalid input: %s", + e.getMessage()); + throw new TerminateWorkflowException(reason); + } } + mappedTasks.addAll(forkedTasks); + // Get the last of the dynamic tasks so that the join can be performed once this + // task is + // done + TaskModel last = forkedTasks.get(forkedTasks.size() - 1); + joinOnTaskRefs.add(last.getReferenceTaskName()); } - mappedTasks.addAll(forkedTasks); - // Get the last of the dynamic tasks so that the join can be performed once this task is - // done - TaskModel last = forkedTasks.get(forkedTasks.size() - 1); - joinOnTaskRefs.add(last.getReferenceTaskName()); } // From the workflow definition get the next task and make sure that it is a JOIN task. @@ -319,15 +359,14 @@ TaskModel createJoinTask( @SuppressWarnings("unchecked") @VisibleForTesting Pair, Map>> getDynamicForkTasksAndInput( - WorkflowTask workflowTask, WorkflowModel workflowModel, String dynamicForkTaskParam) + WorkflowTask workflowTask, + WorkflowModel workflowModel, + String dynamicForkTaskParam, + Map input) throws TerminateWorkflowException { - Map input = - parametersUtils.getTaskInput( - workflowTask.getInputParameters(), workflowModel, null, null); - Object dynamicForkTasksJson = input.get(dynamicForkTaskParam); List dynamicForkWorkflowTasks = - objectMapper.convertValue(dynamicForkTasksJson, ListOfWorkflowTasks); + getDynamicForkWorkflowTasks(dynamicForkTaskParam, input); if (dynamicForkWorkflowTasks == null) { dynamicForkWorkflowTasks = new ArrayList<>(); } @@ -348,6 +387,144 @@ Pair, Map>> getDynamicForkTasksAn dynamicForkWorkflowTasks, (Map>) dynamicForkTasksInput); } + private List getDynamicForkWorkflowTasks( + String dynamicForkTaskParam, Map input) { + Object dynamicForkTasksJson = input.get(dynamicForkTaskParam); + try { + List tasks = + objectMapper.convertValue(dynamicForkTasksJson, ListOfWorkflowTasks); + for (var task : tasks) { + if (task.getTaskReferenceName() == null) { + throw new RuntimeException( + "One of the tasks had a null/missing taskReferenceName"); + } + } + return tasks; + } catch (Exception e) { + LOGGER.warn("IllegalArgumentException in getDynamicForkTasksAndInput", e); + throw new TerminateWorkflowException( + String.format( + "Input '%s' is invalid. Cannot deserialize a list of Workflow Tasks from '%s'", + dynamicForkTaskParam, dynamicForkTasksJson)); + } + } + + Pair, Map>> getDynamicTasksSimple( + WorkflowTask workflowTask, Map input) + throws TerminateWorkflowException { + + String forkSubWorkflowName = (String) input.get("forkTaskWorkflow"); + String forkSubWorkflowVersionStr = (String) input.get("forkTaskWorkflowVersion"); + Integer forkSubWorkflowVersion = null; + try { + forkSubWorkflowVersion = Integer.parseInt(forkSubWorkflowVersionStr); + } catch (NumberFormatException nfe) { + } + + String forkTaskType = (String) input.get("forkTaskType"); + String forkTaskName = (String) input.get("forkTaskName"); + if (forkTaskType != null + && (systemTaskRegistry.isSystemTask(forkTaskType)) + && forkTaskName == null) { + forkTaskName = forkTaskType; + } + if (forkTaskName == null) { + forkTaskName = workflowTask.getTaskReferenceName(); + // or we can ban using just forkTaskWorkflow without forkTaskName + } + + if (forkTaskType == null) { + forkTaskType = TASK_TYPE_SIMPLE; + } + + // This should be a list + Object forkTaskInputs = input.get("forkTaskInputs"); + if (forkTaskInputs == null || !(forkTaskInputs instanceof List)) { + LOGGER.warn( + "fork_task_name is present but the inputs are NOT a list is empty {}", + forkTaskInputs); + return null; + } + List inputs = (List) forkTaskInputs; + + List dynamicForkWorkflowTasks = new ArrayList<>(inputs.size()); + Map> dynamicForkTasksInput = new HashMap<>(); + int i = 0; + for (Object forkTaskInput : inputs) { + WorkflowTask forkTask = null; + if (forkSubWorkflowName != null) { + forkTask = + generateSubWorkflowWorkflowTask( + forkSubWorkflowName, forkSubWorkflowVersion, forkTaskInput); + forkTask.setTaskReferenceName("_" + forkTaskName + "_" + i); + } else { + forkTask = generateWorkflowTask(forkTaskName, forkTaskType, forkTaskInput); + forkTask.setTaskReferenceName("_" + forkTaskName + "_" + i); + } + forkTask.getInputParameters().put("__index", i++); + if (workflowTask.isOptional()) { + forkTask.setOptional(true); + } + + dynamicForkWorkflowTasks.add(forkTask); + dynamicForkTasksInput.put( + forkTask.getTaskReferenceName(), forkTask.getInputParameters()); + } + return new ImmutablePair<>(dynamicForkWorkflowTasks, dynamicForkTasksInput); + } + + private WorkflowTask generateWorkflowTask( + String forkTaskName, String forkTaskType, Object forkTaskInput) { + WorkflowTask forkTask = new WorkflowTask(); + + try { + forkTask = objectMapper.convertValue(forkTaskInput, WorkflowTask.class); + } catch (Exception ignored) { + } + + forkTask.setName(forkTaskName); + forkTask.setType(forkTaskType); + Map inputParameters = new HashMap<>(); + + if (forkTaskInput instanceof Map) { + inputParameters.putAll((Map) forkTaskInput); + } else { + inputParameters.put("input", forkTaskInput); + } + forkTask.setInputParameters(inputParameters); + forkTask.setTaskDefinition(metadataDAO.getTaskDef(forkTaskName)); + return forkTask; + } + + private WorkflowTask generateSubWorkflowWorkflowTask( + String name, Integer version, Object forkTaskInput) { + WorkflowTask forkTask = new WorkflowTask(); + + try { + forkTask = objectMapper.convertValue(forkTaskInput, WorkflowTask.class); + } catch (Exception ignored) { + } + + forkTask.setName(name); + forkTask.setType(SUB_WORKFLOW.toString()); + Map inputParameters = new HashMap<>(); + SubWorkflowParams subWorkflowParams = new SubWorkflowParams(); + subWorkflowParams.setName(name); + subWorkflowParams.setVersion(version); + forkTask.setSubWorkflowParam(subWorkflowParams); + + if (forkTaskInput instanceof Map) { + inputParameters.putAll((Map) forkTaskInput); + Map forkTaskInputMap = (Map) forkTaskInput; + subWorkflowParams.setTaskToDomain( + (Map) forkTaskInputMap.get("taskToDomain")); + } else { + inputParameters.put("input", forkTaskInput); + } + forkTask.setInputParameters(inputParameters); + return forkTask; + } + /** * This method is used to get the List of dynamic workflow tasks and their input based on the * {@link WorkflowTask#getDynamicForkJoinTasksParam()} @@ -366,12 +543,9 @@ Pair, Map>> getDynamicForkTasksAn */ @VisibleForTesting Pair, Map>> getDynamicForkJoinTasksAndInput( - WorkflowTask workflowTask, WorkflowModel workflowModel) + WorkflowTask workflowTask, WorkflowModel workflowModel, Map input) throws TerminateWorkflowException { String dynamicForkJoinTaskParam = workflowTask.getDynamicForkJoinTasksParam(); - Map input = - parametersUtils.getTaskInput( - workflowTask.getInputParameters(), workflowModel, null, null); Object paramValue = input.get(dynamicForkJoinTaskParam); DynamicForkJoinTaskList dynamicForkJoinTaskList = objectMapper.convertValue(paramValue, DynamicForkJoinTaskList.class); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index d06029020..494ec2985 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -14,6 +14,7 @@ import java.io.InputStream; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -139,6 +140,7 @@ public SystemTaskRegistry systemTaskRegistry(Set tasks) { @Before public void init() { MetadataDAO metadataDAO = mock(MetadataDAO.class); + systemTaskRegistry = mock(SystemTaskRegistry.class); ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class); @@ -162,7 +164,11 @@ public void init() { taskMappers.put( FORK_JOIN_DYNAMIC.name(), new ForkJoinDynamicTaskMapper( - new IDGenerator(), parametersUtils, objectMapper, metadataDAO)); + new IDGenerator(), + parametersUtils, + objectMapper, + metadataDAO, + systemTaskRegistry)); taskMappers.put( USER_DEFINED.name(), new UserDefinedTaskMapper(parametersUtils, metadataDAO)); taskMappers.put(SIMPLE.name(), new SimpleTaskMapper(parametersUtils)); @@ -206,8 +212,8 @@ public void testWorkflowWithNoTasks() throws Exception { outcome = deciderService.decide(workflow); assertFalse(outcome.isComplete); assertEquals(outcome.tasksToBeUpdated.toString(), 3, outcome.tasksToBeUpdated.size()); - assertEquals(1, outcome.tasksToBeScheduled.size()); - assertEquals("junit_task_3", outcome.tasksToBeScheduled.get(0).getTaskDefName()); + assertEquals(2, outcome.tasksToBeScheduled.size()); + assertEquals("DECISION", outcome.tasksToBeScheduled.get(0).getTaskDefName()); } @Test @@ -234,8 +240,8 @@ public void testWorkflowWithNoTasksWithSwitch() throws Exception { outcome = deciderService.decide(workflow); assertFalse(outcome.isComplete); assertEquals(outcome.tasksToBeUpdated.toString(), 3, outcome.tasksToBeUpdated.size()); - assertEquals(1, outcome.tasksToBeScheduled.size()); - assertEquals("junit_task_3", outcome.tasksToBeScheduled.get(0).getTaskDefName()); + assertEquals(2, outcome.tasksToBeScheduled.size()); + assertEquals("SWITCH", outcome.tasksToBeScheduled.get(0).getTaskDefName()); } @Test @@ -471,6 +477,7 @@ public void testOptionalWithDynamicFork() { for (int i = 0; i < 3; i++) { WorkflowTask workflowTask = new WorkflowTask(); workflowTask.setName("f" + i); + workflowTask.getInputParameters().put("joinOn", new ArrayList<>()); workflowTask.setTaskReferenceName("f" + i); workflowTask.setWorkflowTaskType(TaskType.SIMPLE); workflowTask.setOptional(true); @@ -490,7 +497,7 @@ public void testOptionalWithDynamicFork() { assertEquals(TASK_TYPE_FORK, outcome.tasksToBeScheduled.get(0).getTaskType()); assertEquals(TaskModel.Status.COMPLETED, outcome.tasksToBeScheduled.get(0).getStatus()); - for (int retryCount = 0; retryCount < 4; retryCount++) { + for (int retryCount = 0; retryCount < 3; retryCount++) { for (TaskModel taskToBeScheduled : outcome.tasksToBeScheduled) { if (taskToBeScheduled.getTaskDefName().equals("join0")) { @@ -506,20 +513,17 @@ public void testOptionalWithDynamicFork() { outcome = deciderService.decide(workflow); assertNotNull(outcome); } - assertEquals(TASK_TYPE_JOIN, outcome.tasksToBeScheduled.get(0).getTaskType()); + assertEquals("f0", outcome.tasksToBeScheduled.get(0).getTaskType()); for (int i = 0; i < 3; i++) { - assertEquals( - TaskModel.Status.COMPLETED_WITH_ERRORS, - outcome.tasksToBeUpdated.get(i).getStatus()); + assertEquals(TaskModel.Status.FAILED, outcome.tasksToBeUpdated.get(i).getStatus()); assertEquals("f" + (i), outcome.tasksToBeUpdated.get(i).getTaskDefName()); } - assertEquals(TaskModel.Status.IN_PROGRESS, outcome.tasksToBeScheduled.get(0).getStatus()); + assertEquals(TaskModel.Status.SCHEDULED, outcome.tasksToBeScheduled.get(0).getStatus()); + System.out.println(outcome.tasksToBeScheduled.get(0)); new Join().execute(workflow, outcome.tasksToBeScheduled.get(0), null); - assertEquals( - TaskModel.Status.COMPLETED_WITH_ERRORS, - outcome.tasksToBeScheduled.get(0).getStatus()); + assertEquals(TaskModel.Status.COMPLETED, outcome.tasksToBeScheduled.get(0).getStatus()); } @Test diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index b974c5c95..cf1ad131b 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -178,7 +178,11 @@ public void init() { taskMappers.put( FORK_JOIN_DYNAMIC.name(), new ForkJoinDynamicTaskMapper( - idGenerator, parametersUtils, objectMapper, metadataDAO)); + idGenerator, + parametersUtils, + objectMapper, + metadataDAO, + mock(SystemTaskRegistry.class))); taskMappers.put( USER_DEFINED.name(), new UserDefinedTaskMapper(parametersUtils, metadataDAO)); taskMappers.put(SIMPLE.name(), new SimpleTaskMapper(parametersUtils)); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapperTest.java b/core/src/test/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapperTest.java index 83d5bac06..d6ba5bb25 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapperTest.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapperTest.java @@ -28,6 +28,7 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.core.execution.DeciderService; +import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.core.utils.ParametersUtils; import com.netflix.conductor.dao.MetadataDAO; @@ -55,6 +56,7 @@ public class ForkJoinDynamicTaskMapperTest { private ObjectMapper objectMapper; private DeciderService deciderService; private ForkJoinDynamicTaskMapper forkJoinDynamicTaskMapper; + private SystemTaskRegistry systemTaskRegistry; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -65,10 +67,15 @@ public void setUp() { parametersUtils = Mockito.mock(ParametersUtils.class); objectMapper = Mockito.mock(ObjectMapper.class); deciderService = Mockito.mock(DeciderService.class); + systemTaskRegistry = Mockito.mock(SystemTaskRegistry.class); forkJoinDynamicTaskMapper = new ForkJoinDynamicTaskMapper( - idGenerator, parametersUtils, objectMapper, metadataDAO); + idGenerator, + parametersUtils, + objectMapper, + metadataDAO, + systemTaskRegistry); } @Test @@ -142,6 +149,7 @@ public void getMappedTasksException() { TaskMapperContext taskMapperContext = TaskMapperContext.newBuilder() + .withTaskInput(Map.of()) .withWorkflowModel(workflowModel) .withWorkflowTask(dynamicForkJoinToSchedule) .withRetryCount(0) @@ -227,6 +235,7 @@ public void getMappedTasks() { .withWorkflowModel(workflowModel) .withWorkflowTask(dynamicForkJoinToSchedule) .withRetryCount(0) + .withTaskInput(Map.of()) .withTaskId(taskId) .withDeciderService(deciderService) .build(); @@ -278,7 +287,7 @@ public void getDynamicForkJoinTasksAndInput() { Pair, Map>> dynamicForkJoinTasksAndInput = forkJoinDynamicTaskMapper.getDynamicForkJoinTasksAndInput( - dynamicForkJoinToSchedule, new WorkflowModel()); + dynamicForkJoinToSchedule, new WorkflowModel(), Map.of()); // then assertNotNull(dynamicForkJoinTasksAndInput.getLeft()); assertEquals(2, dynamicForkJoinTasksAndInput.getLeft().size()); @@ -323,7 +332,7 @@ public void getDynamicForkJoinTasksAndInputException() { expectedException.expect(TerminateWorkflowException.class); forkJoinDynamicTaskMapper.getDynamicForkJoinTasksAndInput( - dynamicForkJoinToSchedule, new WorkflowModel()); + dynamicForkJoinToSchedule, new WorkflowModel(), Map.of()); } @Test @@ -354,7 +363,7 @@ public void getDynamicForkTasksAndInput() { wt3.setName("junit_task_3"); wt3.setTaskReferenceName("xdt2"); - HashMap dynamicTasksInput = new HashMap<>(); + Map dynamicTasksInput = new HashMap<>(); dynamicTasksInput.put("xdt1", input1); dynamicTasksInput.put("xdt2", input2); dynamicTasksInput.put("dynamicTasks", Arrays.asList(wt2, wt3)); @@ -369,7 +378,10 @@ public void getDynamicForkTasksAndInput() { Pair, Map>> dynamicTasks = forkJoinDynamicTaskMapper.getDynamicForkTasksAndInput( - dynamicForkJoinToSchedule, new WorkflowModel(), "dynamicTasks"); + dynamicForkJoinToSchedule, + new WorkflowModel(), + "dynamicTasks", + dynamicTasksInput); // then assertNotNull(dynamicTasks.getLeft()); @@ -419,7 +431,7 @@ public void getDynamicForkTasksAndInputException() { expectedException.expect(TerminateWorkflowException.class); // when forkJoinDynamicTaskMapper.getDynamicForkTasksAndInput( - dynamicForkJoinToSchedule, new WorkflowModel(), "dynamicTasks"); + dynamicForkJoinToSchedule, new WorkflowModel(), "dynamicTasks", Map.of()); } @Test @@ -487,6 +499,7 @@ public void testDynamicTaskDuplicateTaskRefName() { String taskId = idGenerator.generate(); TaskMapperContext taskMapperContext = TaskMapperContext.newBuilder() + .withTaskInput(Map.of()) .withWorkflowModel(workflowModel) .withWorkflowTask(dynamicForkJoinToSchedule) .withRetryCount(0) diff --git a/docs/documentation/configuration/workflowdef/operators/dynamic-fork-task.md b/docs/documentation/configuration/workflowdef/operators/dynamic-fork-task.md index c95d9ebf9..abeef87b4 100644 --- a/docs/documentation/configuration/workflowdef/operators/dynamic-fork-task.md +++ b/docs/documentation/configuration/workflowdef/operators/dynamic-fork-task.md @@ -38,78 +38,270 @@ To use the `DYNAMIC_FORK` task, you need to provide the following attributes at | *dynamicForkTasksParam | This is a JSON array of tasks or sub-workflow objects that needs to be forked and run in parallel (Note: This has a different format for ```SUB_WORKFLOW``` compared to ```SIMPLE``` tasks.) | | *dynamicForkTasksInputParamName | A JSON map, where the keys are task or sub-workflow names, and the values are the `inputParameters` to be passed into the corresponding spawned tasks or sub-workflows. | + Note: * means the de-referenced name. ## Examples + + ### Example 1 -Here is an example of a `FORK_JOIN_DYNAMIC` task followed by a `JOIN` task +Here is an example of a `FORK_JOIN_DYNAMIC` task followed by a `JOIN` task to run a `SIMPLE` task + +(default value of `forkTaskType` is `SIMPLE`) + +The `SIMPLE` task will resize the images and store the resized images into specified `location` as part of input below + +| Attribute | Description | +|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| forkTaskName | Specify the name of the simple task to execute. | +| forkTaskType | Type of the fork Task. | +| forkTaskInputs | A JSON map, where the keys are task or sub-workflow names, and the values are the `inputParameters` to be passed into the corresponding spawned tasks or sub-workflows. | ```json -[ - { - "inputParameters": { - "dynamicTasks": "${fooBarTask.output.dynamicTasksJSON}", - "dynamicTasksInput": "${fooBarTask.output.dynamicTasksInputJSON}" +{ + "name": "image_multiple_convert_resize_fork", + "description": "Image multiple convert resize example", + "version": 1, + "tasks": [ + { + "name": "image_multiple_convert_resize_dynamic_task", + "taskReferenceName": "image_multiple_convert_resize_dynamic_task_ref", + "inputParameters": { + "forkTaskName": "fork_task", + "forkTaskInputs": "Images" : [ + { + "image" : "url1", + "location" : "location url", + "width" : 100, + "height" : 200 + }, + { + "image" : "url2", + "location" : "location url", + "width" : 300, + "height" : 400 + } + ], + "forkTaskType": "SIMPLE" + }, + "type": "FORK_JOIN_DYNAMIC", + "decisionCases": {}, + "dynamicForkTasksParam": "dynamicTasks", + "dynamicForkTasksInputParamName": "dynamicTasksInput", + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {} }, - "type": "FORK_JOIN_DYNAMIC", - "dynamicForkTasksParam": "dynamicTasks", - "dynamicForkTasksInputParamName": "dynamicTasksInput" + { + "name": "image_multiple_convert_resize_join", + "taskReferenceName": "image_multiple_convert_resize_join_ref", + "inputParameters": {}, + "type": "JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {} + } + ], + "inputParameters": [ + "Images" + ], + "outputParameters": { + "Output": "${join_task_ref.output}" }, - { - "name": "image_multiple_convert_resize_join", - "taskReferenceName": "image_multiple_convert_resize_join_ref", - "type": "JOIN" - } -] + "failureWorkflow": "", + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "ownerEmail": "example@gmail.com", + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "variables": {}, + "inputTemplate": {}, + "onStateChange": {} +} ``` -Dissecting into this example above, let's look at the three things that are needed to configured for -the `FORK_JOIN_DYNAMIC` task - -`dynamicForkTasksParam` This is a JSON array of task or sub-workflow objects that specifies the list of tasks or -sub-workflows that needs to be forked and run in parallel `dynamicForkTasksInputParamName` This is a JSON map of task or -sub-workflow objects that specifies the list of tasks or sub-workflows that needs to be forked and run in parallel -fooBarTask This is a task that is defined prior to the FORK_JOIN_DYNAMIC in the workflow definition. This task will need -to output (outputParameters) 1 and 2 above so that it can be wired into inputParameters of the FORK_JOIN_DYNAMIC -tasks. (dynamicTasks and dynamicTasksInput) - ### Example 2 -Let's say we have a task that resizes an image, and we need to create a workflow that will resize an image into multiple sizes. In this case, a task can be created prior to -the `FORK_JOIN_DYNAMIC` task that will prepare the input that needs to be passed into the `FORK_JOIN_DYNAMIC` task. These will be: +Here is an example of a `FORK_JOIN_DYNAMIC` task followed by a `JOIN` task to run an `HTTP` task -* ```dynamicForkTasksParam``` the JSON array of tasks/subworkflows to be run in parallel. Each JSON object will have: - * A unique ```taskReferenceName```. - * The name of the Task/Subworkflow to be called (note - the location of this key:value is different for a subworkflow). - * The type of the task (This is optional for SIMPLE tasks). -* ```dynamicForkTasksInputParamName``` a JSON map of input parameters for each task. The keys will be the unique ```taskReferenceName``` defined in the first JSON array, and the values will be the specific input parameters for the task/subworkflow. +Call to `HTTP` uri will resize the images and store the resized images into specified `location` as part of input below. -The ```image_resize``` task works to resize just one image. The `FORK_JOIN_DYNAMIC` and the following `JOIN` will manage the multiple invocations of the single ```image_resize``` task. The responsibilities are clearly broken out, where the individual ```image_resize``` -tasks do the core job and `FORK_JOIN_DYNAMIC` manages the orchestration and fault tolerance aspects of handling multiple invocations of the task. +```json +{ + "name": "image_multiple_convert_resize_fork", + "description": "Image multiple convert resize example", + "tasks": [ + { + "name": "image_multiple_convert_resize_dynamic_task_http", + "taskReferenceName": "image_multiple_convert_resize_dynamic_task_http_ref", + "inputParameters": { + "forkTaskName": "http_task_name", + "forkTaskType": "HTTP", + "forkTaskInputs": [ + { + "uri" : "https://imageResizeUrl/imageName1///" + }, + { + "uri" : "https://imageResizeUrl/imageName2///", + "method" : "GET" + } + ] + }, + "type": "FORK_JOIN_DYNAMIC", + "dynamicForkTasksParam": "dynamicTasks", + "dynamicForkTasksInputParamName": "dynamicTasksInput" + }, + { + "name": "dynamic_workflow_array_http_join", + "taskReferenceName": "dynamic_workflow_array_http_join_ref", + "type": "JOIN" + } + ] +} +``` + +### Example 3 +Here is an example of a `FORK_JOIN_DYNAMIC` task followed by a `JOIN` task to run `SUBWORKFLOW` -#### Workflow Definition - Task Configuration +`SUBWORKFLOW` will resize the images and store the resized images into specified `location` as part of input below. -Here is an example of a `FORK_JOIN_DYNAMIC` task followed by a `JOIN` task. The fork is named and given a taskReferenceName, but all of the input parameters are JSON variables that we will discuss next: +| Attribute | Description | +|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| forkTaskWorkflow | Specify the name of the sub-workflow to be executed. | +| forkTaskWorkflowVersion | Optional version of the workflow to run. | +| forkTaskInputs | Array of inputs - a task will be executed for each input. | ```json -[ - { - "name": "image_multiple_convert_resize_fork", - "taskReferenceName": "image_multiple_convert_resize_fork_ref", - "inputParameters": { - "dynamicTasks": "${fooBarTask.output.dynamicTasksJSON}", - "dynamicTasksInput": "${fooBarTask.output.dynamicTasksInputJSON}" +{ + "name": "image_multiple_convert_resize_fork", + "description": "Image multiple convert resize example", + "tasks": [ + { + "name": "image_multiple_convert_resize_dynamic_task_subworkflow", + "taskReferenceName": "image_multiple_convert_resize_dynamic_task_subworkflow_ref", + "inputParameters": { + "forkTaskWorkflow": "image_resize_subworkflow", + "forkTaskInputs": [ + { + 'image' : 'url1', + 'location' : 'location url', + 'width' : 100, + 'height' : 200 + }, + { + 'image' : 'url2', + 'location' : 'locationurl', + 'width' : 300, + 'height' : 400 + } + ] + }, + "type": "FORK_JOIN_DYNAMIC", + "dynamicForkTasksParam": "dynamicTasks", + "dynamicForkTasksInputParamName": "dynamicTasksInput" }, - "type": "FORK_JOIN_DYNAMIC", - "dynamicForkTasksParam": "dynamicTasks", - "dynamicForkTasksInputParamName": "dynamicTasksInput" + { + "name": "dynamic_workflow_array_http_subworkflow", + "taskReferenceName": "dynamic_workflow_array_http_subworkflow_ref", + "type": "JOIN" + } + ] +} +``` + +### Example 4 +Here is an example of a `FORK_JOIN_DYNAMIC` task followed by a `JOIN` using input parameters + +#### Using JSON Task +```json +{ + "name": "image_multiple_convert_resize_fork", + "description": "Image multiple convert resize example", + "inputParameters": { + "dynamicTasks": [ + { + "name":"image_convert_resize", + "taskReferenceName": "image_convert_resize_png_300x300_0" + // ... + }, + { + "name":"image_convert_resize", + "taskReferenceName": "image_convert_resize_png_200x200_1" + // ... + } + ], + "dynamicTasksInput": { + "image_convert_resize_png_300x300_0" : { + "outputWidth": 300, + "outputHeight": 300 + }, + "image_convert_resize_png_200x200_1" : { + "outputWidth": 200, + "outputHeight": 200 + } + } }, - { - "name": "image_multiple_convert_resize_join", - "taskReferenceName": "image_multiple_convert_resize_join_ref", - "type": "JOIN" - } -] + "type": "FORK_JOIN_DYNAMIC", + "dynamicForkTasksParam": "dynamicTasks", + "dynamicForkTasksInputParamName": "dynamicTasksInput" +} +``` + +#### Using JSON Subworkflow +```json +{ + "name": "dynamic", + "taskReferenceName": "dynamic_ref", + "inputParameters": { + "dynamicTasks": [ + { + "subWorkflowParam" : { + "name": :"image_convert_resize_subworkflow", + "version": "1" + }, + "type" : "SUB_WORKFLOW", + "taskReferenceName": "image_convert_resize_subworkflow_png_300x300_0", + // ... + }, + { + "subWorkflowParam" : { + "name": :"image_convert_resize_subworkflow", + "version": "1" + }, + "type" : "SUB_WORKFLOW", + "taskReferenceName": "image_convert_resize_subworkflow_png_200x200_1", + // ... + } + ], + "dynamicTasksInput": { + "image_convert_resize_png_300x300_0" : { + "outputWidth": 300, + "outputHeight": 300 + }, + "image_convert_resize_png_200x200_1" : { + "outputWidth": 200, + "outputHeight": 200 + } + } + }, + "type": "FORK_JOIN_DYNAMIC", + "dynamicForkTasksParam": "dynamicTasks", + "dynamicForkTasksInputParamName": "dynamicTasksInput" +} ``` This appears in the UI as follows: @@ -135,116 +327,12 @@ Let's assume this data is sent to the workflow: With 2 file formats and 2 sizes in the input, we'll be creating 4 images total. The first task will generate the tasks and the parameters for these tasks: -* `dynamicForkTasksParam` This is a JSON array of task or sub-workflow objects that specifies the list of tasks or sub-workflows that needs to be forked and run in parallel. This JSON varies depeding oon the type of task. - - -#### ```dynamicForkTasksParam``` Simple task -In this case, our fork is running a SIMPLE task: ```image_convert_resize```: - -``` -{ "dynamicTasks": [ - { - "name": :"image_convert_resize", - "taskReferenceName": "image_convert_resize_png_300x300_0", - ... - }, - { - "name": :"image_convert_resize", - "taskReferenceName": "image_convert_resize_png_200x200_1", - ... - }, - { - "name": :"image_convert_resize", - "taskReferenceName": "image_convert_resize_jpg_300x300_2", - ... - }, - { - "name": :"image_convert_resize", - "taskReferenceName": "image_convert_resize_jpg_200x200_3", - ... - } -]} -``` -#### ```dynamicForkTasksParam``` SubWorkflow task -In this case, our Dynamic fork is running a SUB_WORKFLOW task: ```image_convert_resize_subworkflow``` - -``` -{ "dynamicTasks": [ - { - "subWorkflowParam" : { - "name": :"image_convert_resize_subworkflow", - "version": "1" - }, - "type" : "SUB_WORKFLOW", - "taskReferenceName": "image_convert_resize_subworkflow_png_300x300_0", - ... - }, - { - "subWorkflowParam" : { - "name": :"image_convert_resize_subworkflow", - "version": "1" - }, - "type" : "SUB_WORKFLOW", - "taskReferenceName": "image_convert_resize_subworkflow_png_200x200_1", - ... - }, - { - "subWorkflowParam" : { - "name": :"image_convert_resize_subworkflow", - "version": "1" - }, - "type" : "SUB_WORKFLOW", - "taskReferenceName": "image_convert_resize_subworkflow_jpg_300x300_2", - ... - }, - { - "subWorkflowParam" : { - "name": :"image_convert_resize_subworkflow", - "version": "1" - }, - "type" : "SUB_WORKFLOW", - "taskReferenceName": "image_convert_resize_subworkflow_jpg_200x200_3", - ... - } -]} -``` * `dynamicForkTasksInputParamName` This is a JSON map of task or sub-workflow objects and all the input parameters that these tasks will need to run. -``` -"dynamicTasksInput":{ -"image_convert_resize_jpg_300x300_2":{ -"outputWidth":300 -"outputHeight":300 -"fileLocation":"https://pbs.twimg.com/media/FJY7ud0XEAYVCS8?format=png&name=900x900" -"outputFormat":"jpg" -"maintainAspectRatio":true -} -"image_convert_resize_jpg_200x200_3":{ -"outputWidth":200 -"outputHeight":200 -"fileLocation":"https://pbs.twimg.com/media/FJY7ud0XEAYVCS8?format=png&name=900x900" -"outputFormat":"jpg" -"maintainAspectRatio":true -} -"image_convert_resize_png_200x200_1":{ -"outputWidth":200 -"outputHeight":200 -"fileLocation":"https://pbs.twimg.com/media/FJY7ud0XEAYVCS8?format=png&name=900x900" -"outputFormat":"png" -"maintainAspectRatio":true -} -"image_convert_resize_png_300x300_0":{ -"outputWidth":300 -"outputHeight":300 -"fileLocation":"https://pbs.twimg.com/media/FJY7ud0XEAYVCS8?format=png&name=900x900" -"outputFormat":"png" -"maintainAspectRatio":true -} -``` #### Join Task