From 05df715947b9e7608ea0a8e1c973624e4ece3e8e Mon Sep 17 00:00:00 2001 From: junaidHussain-clari <91540528+junaidHussain-clari@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:42:01 +0530 Subject: [PATCH 1/2] made changes to pass down the taskToDomain map to child workflow in case of events --- .../core/events/SimpleActionProcessor.java | 14 +++++- .../conductor/core/execution/tasks/Event.java | 1 + .../events/TestSimpleActionProcessor.java | 43 +++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java index 474d7cfa3..46fcd545d 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java @@ -32,6 +32,7 @@ import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import org.springframework.util.CollectionUtils; /** * Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start @@ -196,10 +197,19 @@ private Map startWorkflow( Map workflowInput = parametersUtils.replace(inputParams, payload); Map paramsMap = new HashMap<>(); + // extracting taskToDomain map from the event payload + paramsMap.put("taskToDomain","${taskToDomain}"); Optional.ofNullable(params.getCorrelationId()) .ifPresent(value -> paramsMap.put("correlationId", value)); Map replaced = parametersUtils.replace(paramsMap, payload); + // if taskToDomain is absent from event handler definition, and taskDomain Map is passed as a part of payload + // then assign payload taskToDomain map to the new workflow instance + final Map taskToDomain = params.getTaskToDomain() != null ? + params.getTaskToDomain() : + (Map) replaced.get("taskToDomain"); + + workflowInput.put("conductor.event.messageId", messageId); workflowInput.put("conductor.event.name", event); @@ -212,7 +222,9 @@ private Map startWorkflow( .orElse(params.getCorrelationId())); startWorkflowInput.setWorkflowInput(workflowInput); startWorkflowInput.setEvent(event); - startWorkflowInput.setTaskToDomain(params.getTaskToDomain()); + if (!CollectionUtils.isEmpty(taskToDomain)) { + startWorkflowInput.setTaskToDomain(taskToDomain); + } String workflowId = workflowExecutor.startWorkflow(startWorkflowInput); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java index 9674efe07..a46cc69f1 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java @@ -62,6 +62,7 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf payload.put("workflowType", workflow.getWorkflowName()); payload.put("workflowVersion", workflow.getWorkflowVersion()); payload.put("correlationId", workflow.getCorrelationId()); + payload.put("taskToDomain", workflow.getTaskToDomain()); task.setStatus(TaskModel.Status.IN_PROGRESS); task.addOutput(payload); diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java index eef93c734..afda4c769 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java @@ -114,6 +114,49 @@ public void testStartWorkflow_correlationId() throws Exception { assertEquals(taskToDomain, capturedValue.getTaskToDomain()); } + @Test + public void testStartWorkflow_taskDomain() throws Exception { + StartWorkflow startWorkflow = new StartWorkflow(); + startWorkflow.setName("testWorkflow"); + startWorkflow.getInput().put("testInput", "${testId}"); + + Action action = new Action(); + action.setAction(Type.start_workflow); + action.setStart_workflow(startWorkflow); + + Object payload = + objectMapper.readValue( + "{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }", Object.class); + + Map taskToDomain = new HashMap<>(); + taskToDomain.put("testTask", "testDomain"); + + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("testWorkflow"); + workflowDef.setVersion(1); + + when(workflowExecutor.startWorkflow(any())).thenReturn("workflow_1"); + + Map output = + actionProcessor.execute(action, payload, "testEvent", "testMessage"); + + assertNotNull(output); + assertEquals("workflow_1", output.get("workflowId")); + + ArgumentCaptor startWorkflowInputArgumentCaptor = + ArgumentCaptor.forClass(StartWorkflowInput.class); + + verify(workflowExecutor).startWorkflow(startWorkflowInputArgumentCaptor.capture()); + StartWorkflowInput capturedValue = startWorkflowInputArgumentCaptor.getValue(); + + assertEquals("test_1", capturedValue.getWorkflowInput().get("testInput")); + assertEquals(taskToDomain, capturedValue.getTaskToDomain()); + assertEquals( + "testMessage", capturedValue.getWorkflowInput().get("conductor.event.messageId")); + assertEquals("testEvent", capturedValue.getWorkflowInput().get("conductor.event.name")); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testStartWorkflow() throws Exception { From 70b06c77510614aa3e4a0b855538137cbcd960a1 Mon Sep 17 00:00:00 2001 From: junaidHussain-clari <91540528+junaidHussain-clari@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:46:31 +0530 Subject: [PATCH 2/2] ran gradlew spotlessApply for formatting --- .../core/events/SimpleActionProcessor.java | 15 ++++++++------- .../core/events/TestSimpleActionProcessor.java | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java index 46fcd545d..84e723439 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java @@ -19,6 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import com.netflix.conductor.common.metadata.events.EventHandler.Action; import com.netflix.conductor.common.metadata.events.EventHandler.StartWorkflow; @@ -32,7 +33,6 @@ import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; -import org.springframework.util.CollectionUtils; /** * Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start @@ -198,17 +198,18 @@ private Map startWorkflow( Map paramsMap = new HashMap<>(); // extracting taskToDomain map from the event payload - paramsMap.put("taskToDomain","${taskToDomain}"); + paramsMap.put("taskToDomain", "${taskToDomain}"); Optional.ofNullable(params.getCorrelationId()) .ifPresent(value -> paramsMap.put("correlationId", value)); Map replaced = parametersUtils.replace(paramsMap, payload); - // if taskToDomain is absent from event handler definition, and taskDomain Map is passed as a part of payload + // if taskToDomain is absent from event handler definition, and taskDomain Map is passed + // as a part of payload // then assign payload taskToDomain map to the new workflow instance - final Map taskToDomain = params.getTaskToDomain() != null ? - params.getTaskToDomain() : - (Map) replaced.get("taskToDomain"); - + final Map taskToDomain = + params.getTaskToDomain() != null + ? params.getTaskToDomain() + : (Map) replaced.get("taskToDomain"); workflowInput.put("conductor.event.messageId", messageId); workflowInput.put("conductor.event.name", event); diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java index afda4c769..3e57142b7 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java @@ -126,7 +126,8 @@ public void testStartWorkflow_taskDomain() throws Exception { Object payload = objectMapper.readValue( - "{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }", Object.class); + "{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }", + Object.class); Map taskToDomain = new HashMap<>(); taskToDomain.put("testTask", "testDomain"); @@ -156,7 +157,6 @@ public void testStartWorkflow_taskDomain() throws Exception { assertEquals("testEvent", capturedValue.getWorkflowInput().get("conductor.event.name")); } - @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testStartWorkflow() throws Exception {