Skip to content

Commit

Permalink
made changes to pass down the taskToDomain map to child workflow in c…
Browse files Browse the repository at this point in the history
…ase of events
  • Loading branch information
junaidHussain-clari committed Aug 27, 2024
1 parent 29f5a9a commit 05df715
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
import org.springframework.util.CollectionUtils;

/**
* Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start
Expand Down Expand Up @@ -196,10 +197,19 @@ private Map<String, Object> startWorkflow(
Map<String, Object> workflowInput = parametersUtils.replace(inputParams, payload);

Map<String, Object> paramsMap = new HashMap<>();
// extracting taskToDomain map from the event payload
paramsMap.put("taskToDomain","${taskToDomain}");
Optional.ofNullable(params.getCorrelationId())
.ifPresent(value -> paramsMap.put("correlationId", value));
Map<String, Object> replaced = parametersUtils.replace(paramsMap, payload);

// if taskToDomain is absent from event handler definition, and taskDomain Map is passed as a part of payload
// then assign payload taskToDomain map to the new workflow instance
final Map<String, String> taskToDomain = params.getTaskToDomain() != null ?
params.getTaskToDomain() :
(Map<String, String>) replaced.get("taskToDomain");


workflowInput.put("conductor.event.messageId", messageId);
workflowInput.put("conductor.event.name", event);

Expand All @@ -212,7 +222,9 @@ private Map<String, Object> startWorkflow(
.orElse(params.getCorrelationId()));
startWorkflowInput.setWorkflowInput(workflowInput);
startWorkflowInput.setEvent(event);
startWorkflowInput.setTaskToDomain(params.getTaskToDomain());
if (!CollectionUtils.isEmpty(taskToDomain)) {
startWorkflowInput.setTaskToDomain(taskToDomain);
}

String workflowId = workflowExecutor.startWorkflow(startWorkflowInput);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf
payload.put("workflowType", workflow.getWorkflowName());
payload.put("workflowVersion", workflow.getWorkflowVersion());
payload.put("correlationId", workflow.getCorrelationId());
payload.put("taskToDomain", workflow.getTaskToDomain());

task.setStatus(TaskModel.Status.IN_PROGRESS);
task.addOutput(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,49 @@ public void testStartWorkflow_correlationId() throws Exception {
assertEquals(taskToDomain, capturedValue.getTaskToDomain());
}

@Test
public void testStartWorkflow_taskDomain() throws Exception {
StartWorkflow startWorkflow = new StartWorkflow();
startWorkflow.setName("testWorkflow");
startWorkflow.getInput().put("testInput", "${testId}");

Action action = new Action();
action.setAction(Type.start_workflow);
action.setStart_workflow(startWorkflow);

Object payload =
objectMapper.readValue(
"{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }", Object.class);

Map<String, String> taskToDomain = new HashMap<>();
taskToDomain.put("testTask", "testDomain");

WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("testWorkflow");
workflowDef.setVersion(1);

when(workflowExecutor.startWorkflow(any())).thenReturn("workflow_1");

Map<String, Object> output =
actionProcessor.execute(action, payload, "testEvent", "testMessage");

assertNotNull(output);
assertEquals("workflow_1", output.get("workflowId"));

ArgumentCaptor<StartWorkflowInput> startWorkflowInputArgumentCaptor =
ArgumentCaptor.forClass(StartWorkflowInput.class);

verify(workflowExecutor).startWorkflow(startWorkflowInputArgumentCaptor.capture());
StartWorkflowInput capturedValue = startWorkflowInputArgumentCaptor.getValue();

assertEquals("test_1", capturedValue.getWorkflowInput().get("testInput"));
assertEquals(taskToDomain, capturedValue.getTaskToDomain());
assertEquals(
"testMessage", capturedValue.getWorkflowInput().get("conductor.event.messageId"));
assertEquals("testEvent", capturedValue.getWorkflowInput().get("conductor.event.name"));
}


@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void testStartWorkflow() throws Exception {
Expand Down

0 comments on commit 05df715

Please sign in to comment.