From 05df715947b9e7608ea0a8e1c973624e4ece3e8e Mon Sep 17 00:00:00 2001 From: junaidHussain-clari <91540528+junaidHussain-clari@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:42:01 +0530 Subject: [PATCH] made changes to pass down the taskToDomain map to child workflow in case of events --- .../core/events/SimpleActionProcessor.java | 14 +++++- .../conductor/core/execution/tasks/Event.java | 1 + .../events/TestSimpleActionProcessor.java | 43 +++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java index 474d7cfa3..46fcd545d 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java @@ -32,6 +32,7 @@ import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import org.springframework.util.CollectionUtils; /** * Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start @@ -196,10 +197,19 @@ private Map startWorkflow( Map workflowInput = parametersUtils.replace(inputParams, payload); Map paramsMap = new HashMap<>(); + // extracting taskToDomain map from the event payload + paramsMap.put("taskToDomain","${taskToDomain}"); Optional.ofNullable(params.getCorrelationId()) .ifPresent(value -> paramsMap.put("correlationId", value)); Map replaced = parametersUtils.replace(paramsMap, payload); + // if taskToDomain is absent from event handler definition, and taskDomain Map is passed as a part of payload + // then assign payload taskToDomain map to the new workflow instance + final Map taskToDomain = params.getTaskToDomain() != null ? + params.getTaskToDomain() : + (Map) replaced.get("taskToDomain"); + + workflowInput.put("conductor.event.messageId", messageId); workflowInput.put("conductor.event.name", event); @@ -212,7 +222,9 @@ private Map startWorkflow( .orElse(params.getCorrelationId())); startWorkflowInput.setWorkflowInput(workflowInput); startWorkflowInput.setEvent(event); - startWorkflowInput.setTaskToDomain(params.getTaskToDomain()); + if (!CollectionUtils.isEmpty(taskToDomain)) { + startWorkflowInput.setTaskToDomain(taskToDomain); + } String workflowId = workflowExecutor.startWorkflow(startWorkflowInput); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java index 9674efe07..a46cc69f1 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java @@ -62,6 +62,7 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf payload.put("workflowType", workflow.getWorkflowName()); payload.put("workflowVersion", workflow.getWorkflowVersion()); payload.put("correlationId", workflow.getCorrelationId()); + payload.put("taskToDomain", workflow.getTaskToDomain()); task.setStatus(TaskModel.Status.IN_PROGRESS); task.addOutput(payload); diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java index eef93c734..afda4c769 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java @@ -114,6 +114,49 @@ public void testStartWorkflow_correlationId() throws Exception { assertEquals(taskToDomain, capturedValue.getTaskToDomain()); } + @Test + public void testStartWorkflow_taskDomain() throws Exception { + StartWorkflow startWorkflow = new StartWorkflow(); + startWorkflow.setName("testWorkflow"); + startWorkflow.getInput().put("testInput", "${testId}"); + + Action action = new Action(); + action.setAction(Type.start_workflow); + action.setStart_workflow(startWorkflow); + + Object payload = + objectMapper.readValue( + "{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }", Object.class); + + Map taskToDomain = new HashMap<>(); + taskToDomain.put("testTask", "testDomain"); + + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("testWorkflow"); + workflowDef.setVersion(1); + + when(workflowExecutor.startWorkflow(any())).thenReturn("workflow_1"); + + Map output = + actionProcessor.execute(action, payload, "testEvent", "testMessage"); + + assertNotNull(output); + assertEquals("workflow_1", output.get("workflowId")); + + ArgumentCaptor startWorkflowInputArgumentCaptor = + ArgumentCaptor.forClass(StartWorkflowInput.class); + + verify(workflowExecutor).startWorkflow(startWorkflowInputArgumentCaptor.capture()); + StartWorkflowInput capturedValue = startWorkflowInputArgumentCaptor.getValue(); + + assertEquals("test_1", capturedValue.getWorkflowInput().get("testInput")); + assertEquals(taskToDomain, capturedValue.getTaskToDomain()); + assertEquals( + "testMessage", capturedValue.getWorkflowInput().get("conductor.event.messageId")); + assertEquals("testEvent", capturedValue.getWorkflowInput().get("conductor.event.name")); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testStartWorkflow() throws Exception {