Skip to content

Commit

Permalink
Merge pull request #250 from junaidHussain-clari/PassingTaskDomainVia…
Browse files Browse the repository at this point in the history
…Events

made changes to pass down the taskToDomain map to child workflow in case of events
  • Loading branch information
v1r3n authored Sep 3, 2024
2 parents 62a79e8 + 70b06c7 commit 9fef8fd
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import com.netflix.conductor.common.metadata.events.EventHandler.Action;
import com.netflix.conductor.common.metadata.events.EventHandler.StartWorkflow;
Expand Down Expand Up @@ -196,10 +197,20 @@ private Map<String, Object> startWorkflow(
Map<String, Object> workflowInput = parametersUtils.replace(inputParams, payload);

Map<String, Object> paramsMap = new HashMap<>();
// extracting taskToDomain map from the event payload
paramsMap.put("taskToDomain", "${taskToDomain}");
Optional.ofNullable(params.getCorrelationId())
.ifPresent(value -> paramsMap.put("correlationId", value));
Map<String, Object> replaced = parametersUtils.replace(paramsMap, payload);

// if taskToDomain is absent from event handler definition, and taskDomain Map is passed
// as a part of payload
// then assign payload taskToDomain map to the new workflow instance
final Map<String, String> taskToDomain =
params.getTaskToDomain() != null
? params.getTaskToDomain()
: (Map<String, String>) replaced.get("taskToDomain");

workflowInput.put("conductor.event.messageId", messageId);
workflowInput.put("conductor.event.name", event);

Expand All @@ -212,7 +223,9 @@ private Map<String, Object> startWorkflow(
.orElse(params.getCorrelationId()));
startWorkflowInput.setWorkflowInput(workflowInput);
startWorkflowInput.setEvent(event);
startWorkflowInput.setTaskToDomain(params.getTaskToDomain());
if (!CollectionUtils.isEmpty(taskToDomain)) {
startWorkflowInput.setTaskToDomain(taskToDomain);
}

String workflowId = workflowExecutor.startWorkflow(startWorkflowInput);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf
payload.put("workflowType", workflow.getWorkflowName());
payload.put("workflowVersion", workflow.getWorkflowVersion());
payload.put("correlationId", workflow.getCorrelationId());
payload.put("taskToDomain", workflow.getTaskToDomain());

task.setStatus(TaskModel.Status.IN_PROGRESS);
task.addOutput(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,49 @@ public void testStartWorkflow_correlationId() throws Exception {
assertEquals(taskToDomain, capturedValue.getTaskToDomain());
}

@Test
public void testStartWorkflow_taskDomain() throws Exception {
StartWorkflow startWorkflow = new StartWorkflow();
startWorkflow.setName("testWorkflow");
startWorkflow.getInput().put("testInput", "${testId}");

Action action = new Action();
action.setAction(Type.start_workflow);
action.setStart_workflow(startWorkflow);

Object payload =
objectMapper.readValue(
"{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }",
Object.class);

Map<String, String> taskToDomain = new HashMap<>();
taskToDomain.put("testTask", "testDomain");

WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("testWorkflow");
workflowDef.setVersion(1);

when(workflowExecutor.startWorkflow(any())).thenReturn("workflow_1");

Map<String, Object> output =
actionProcessor.execute(action, payload, "testEvent", "testMessage");

assertNotNull(output);
assertEquals("workflow_1", output.get("workflowId"));

ArgumentCaptor<StartWorkflowInput> startWorkflowInputArgumentCaptor =
ArgumentCaptor.forClass(StartWorkflowInput.class);

verify(workflowExecutor).startWorkflow(startWorkflowInputArgumentCaptor.capture());
StartWorkflowInput capturedValue = startWorkflowInputArgumentCaptor.getValue();

assertEquals("test_1", capturedValue.getWorkflowInput().get("testInput"));
assertEquals(taskToDomain, capturedValue.getTaskToDomain());
assertEquals(
"testMessage", capturedValue.getWorkflowInput().get("conductor.event.messageId"));
assertEquals("testEvent", capturedValue.getWorkflowInput().get("conductor.event.name"));
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void testStartWorkflow() throws Exception {
Expand Down

0 comments on commit 9fef8fd

Please sign in to comment.