From 3d287a44fcb3b463e38f03965c6db9f7a9bb3d8f Mon Sep 17 00:00:00 2001 From: manan164 <1897158+manan164@users.noreply.github.com> Date: Mon, 10 Jun 2024 11:16:29 +0530 Subject: [PATCH 1/8] Make sub workflow task sync --- .../netflix/conductor/core/execution/tasks/SubWorkflow.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java index 6e29dd64d..3c9dcb769 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java @@ -147,11 +147,6 @@ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor work workflowExecutor.terminateWorkflow(subWorkflow, reason, null); } - @Override - public boolean isAsync() { - return true; - } - /** * Keep Subworkflow task asyncComplete. The Subworkflow task will be executed once * asynchronously to move to IN_PROGRESS state, and will move to termination by Subworkflow's From 2de6993430b1ae52c0c67c04e05c365fbd07d1d1 Mon Sep 17 00:00:00 2001 From: manan164 <1897158+manan164@users.noreply.github.com> Date: Mon, 10 Jun 2024 11:25:41 +0530 Subject: [PATCH 2/8] assertion --- .../netflix/conductor/core/execution/tasks/TestSubWorkflow.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java index 2d5fcccd8..15a975f41 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java @@ -415,7 +415,7 @@ public void testCancelWithoutWorkflowId() { @Test public void testIsAsync() { - assertTrue(subWorkflow.isAsync()); + assertFalse(subWorkflow.isAsync()); } @Test From b62b70997a2ee3be93ebee195f2c6134a5e0108f Mon Sep 17 00:00:00 2001 From: manan164 <1897158+manan164@users.noreply.github.com> Date: Mon, 10 Jun 2024 17:06:24 +0530 Subject: [PATCH 3/8] fix sub workflow tests --- .../conductor/test/integration/SubWorkflowSpec.groovy | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy index 8bc6bbff9..d5b8cdee7 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy @@ -103,13 +103,11 @@ class SubWorkflowSpec extends AbstractSpecification { tasks[0].taskType == 'integration_task_1' tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == 'SUB_WORKFLOW' - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS } when: "the subworkflow is started by issuing a system task call" - List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - String subworkflowTaskId = polledTaskIds.get(0) - asyncSystemTaskExecutor.execute(subWorkflowTask, subworkflowTaskId) + String subworkflowTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTasks().get(1).getTaskId() then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -251,7 +249,7 @@ class SubWorkflowSpec extends AbstractSpecification { tasks[0].taskType == 'integration_task_1' tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == 'SUB_WORKFLOW' - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS } when: "Polled for and executed subworkflow task" @@ -351,7 +349,7 @@ class SubWorkflowSpec extends AbstractSpecification { tasks[0].taskType == 'integration_task_1' tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == 'SUB_WORKFLOW' - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS } when: "the subworkflow is started by issuing a system task call" From 2b7073ce37312bef4acd86e2fd63f7b0728f9253 Mon Sep 17 00:00:00 2001 From: manan164 <1897158+manan164@users.noreply.github.com> Date: Mon, 10 Jun 2024 17:27:14 +0530 Subject: [PATCH 4/8] remove not required pop --- .../conductor/test/integration/SubWorkflowSpec.groovy | 6 ------ 1 file changed, 6 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy index d5b8cdee7..c897c7ed8 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy @@ -253,8 +253,6 @@ class SubWorkflowSpec extends AbstractSpecification { } when: "Polled for and executed subworkflow task" - List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowId = workflow.tasks[1].subWorkflowId @@ -352,10 +350,6 @@ class SubWorkflowSpec extends AbstractSpecification { tasks[1].status == Task.Status.IN_PROGRESS } - when: "the subworkflow is started by issuing a system task call" - List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING From 8c73f1896358744c3b7d6a9136b2aaaaf66ee43c Mon Sep 17 00:00:00 2001 From: manan164 <1897158+manan164@users.noreply.github.com> Date: Mon, 10 Jun 2024 18:34:33 +0530 Subject: [PATCH 5/8] rerun test fix. --- .../integration/SubWorkflowRerunSpec.groovy | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRerunSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRerunSpec.groovy index 350a896f2..8acbd1351 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRerunSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRerunSpec.groovy @@ -201,13 +201,11 @@ class SubWorkflowRerunSpec extends AbstractSpecification { tasks[0].taskType == 'integration_task_1' tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS } when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -219,12 +217,13 @@ class SubWorkflowRerunSpec extends AbstractSpecification { } when: "poll and complete the integration_task_1 task in the mid-level workflow" - workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) + def polledAndCompletedTry1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done'], 5) + + then: "verify that the 'integration_task_1' was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndCompletedTry1) and: "poll and execute the sub workflow task" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId @@ -303,13 +302,11 @@ class SubWorkflowRerunSpec extends AbstractSpecification { tasks[0].taskType == 'integration_task_1' tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS } when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -324,9 +321,7 @@ class SubWorkflowRerunSpec extends AbstractSpecification { workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) and: "poll and execute the sub workflow task" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId @@ -414,9 +409,7 @@ class SubWorkflowRerunSpec extends AbstractSpecification { workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId @@ -506,9 +499,7 @@ class SubWorkflowRerunSpec extends AbstractSpecification { workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId From d4d1bb3a23883e123be94acf0ae91b1c53e9fe28 Mon Sep 17 00:00:00 2001 From: manan164 <1897158+manan164@users.noreply.github.com> Date: Mon, 10 Jun 2024 19:00:32 +0530 Subject: [PATCH 6/8] more fixes. --- .../test/integration/DoWhileSpec.groovy | 7 +--- .../integration/DynamicForkJoinSpec.groovy | 5 +-- .../ExternalPayloadStorageSpec.groovy | 5 +-- .../integration/FailureWorkflowSpec.groovy | 20 +-------- .../test/integration/ForkJoinSpec.groovy | 20 +++------ ...rchicalForkJoinSubworkflowRerunSpec.groovy | 42 ++++++------------- 6 files changed, 23 insertions(+), 76 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy index 5b3327088..51de75a43 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy @@ -421,14 +421,9 @@ class DoWhileSpec extends AbstractSpecification { tasks[5].taskType == 'JOIN' tasks[5].status == Task.Status.COMPLETED tasks[6].taskType == 'SUB_WORKFLOW' - tasks[6].status == Task.Status.SCHEDULED + tasks[6].status == Task.Status.IN_PROGRESS } - when: "the sub workflow is started by issuing a system task call" - def parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) - def subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1__1').taskId - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) - then: "verify that the sub workflow task is in a IN PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy index 723748fe4..bb720f7e6 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy @@ -436,7 +436,7 @@ class DynamicForkJoinSpec extends AbstractSpecification { tasks[1].taskType == 'FORK' tasks[1].status == Task.Status.COMPLETED tasks[2].taskType == 'SUB_WORKFLOW' - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.IN_PROGRESS tasks[3].taskType == 'integration_task_10' tasks[3].status == Task.Status.SCHEDULED tasks[4].taskType == 'JOIN' @@ -446,9 +446,6 @@ class DynamicForkJoinSpec extends AbstractSpecification { when: "the subworkflow is started by issuing a system task call" def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("dynamicfanouttask_join").taskId - List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - String subworkflowTaskId = polledTaskIds.get(0) - asyncSystemTaskExecutor.execute(subWorkflowTask, subworkflowTaskId) then: "verify that the sub workflow task is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy index 2edbc6099..02d5658e7 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy @@ -448,7 +448,7 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { tasks[0].outputData.isEmpty() tasks[1].taskType == TaskType.SUB_WORKFLOW.name() - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[1].inputData.isEmpty() } @@ -456,7 +456,6 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { when: "the subworkflow is started by issuing a system task call" def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowTaskId = workflow.getTaskByRefName('swt').taskId - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) then: "verify that the sub workflow task is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -756,7 +755,7 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { tasks[2].taskType == 'SUB_WORKFLOW' !tasks[2].inputData - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.IN_PROGRESS tasks[3].taskType == 'JOIN' tasks[3].status == Task.Status.IN_PROGRESS tasks[3].referenceTaskName == 'dynamicfanouttask_join' diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/FailureWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/FailureWorkflowSpec.groovy index 26f1955e5..b58bc2765 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/FailureWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/FailureWorkflowSpec.groovy @@ -87,27 +87,9 @@ class FailureWorkflowSpec extends AbstractSpecification { def workflowInstanceId = startWorkflow(PARENT_WORKFLOW_WITH_FAILURE_TASK, 1, '', workflowInput, null) - then: "verify that the workflow has started and the tasks are as expected" - with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { - status == Workflow.WorkflowStatus.RUNNING - tasks.size() == 2 - tasks[0].status == Task.Status.COMPLETED - tasks[0].taskType == 'LAMBDA' - tasks[0].referenceTaskName == 'lambdaTask1' - tasks[0].seq == 1 - tasks[1].status == Task.Status.SCHEDULED - tasks[1].taskType == 'SUB_WORKFLOW' - tasks[1].seq == 2 - } - - when: "subworkflow is retrieved" + then: "verify that the sub workflow has failed" def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) - def subWorkflowTaskId = workflow.getTaskByRefName("test_task_failed_sub_wf").getTaskId() - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) - workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowId = workflow.getTaskByRefName("test_task_failed_sub_wf").subWorkflowId - - then: "verify that the sub workflow has failed" with(workflowExecutionService.getExecutionStatus(subWorkflowId, true)) { status == Workflow.WorkflowStatus.FAILED tasks.size() == 2 diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 1274eb99b..5c17e816c 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -865,7 +865,7 @@ class ForkJoinSpec extends AbstractSpecification { tasks[5].inputData['joinOn'] == ['t14', 't20'] tasks[6].taskType == 'SUB_WORKFLOW' - tasks[6].status == Task.Status.SCHEDULED + tasks[6].status == Task.Status.IN_PROGRESS tasks[7].taskType == 'JOIN' tasks[7].status == Task.Status.IN_PROGRESS tasks[7].inputData['joinOn'] == ['t11', 'join2', 'sw1'] @@ -905,7 +905,7 @@ class ForkJoinSpec extends AbstractSpecification { tasks[5].inputData['joinOn'] == ['t14', 't20'] tasks[6].taskType == 'SUB_WORKFLOW' - tasks[6].status == Task.Status.SCHEDULED + tasks[6].status == Task.Status.IN_PROGRESS tasks[7].taskType == 'JOIN' tasks[7].status == Task.Status.IN_PROGRESS tasks[7].inputData['joinOn'] == ['t11', 'join2', 'sw1'] @@ -923,8 +923,6 @@ class ForkJoinSpec extends AbstractSpecification { and: "Get the sub workflow id associated with the SubWorkflow Task sw1 and start the system task" def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) - def subWorkflowTaskId = workflow.getTaskByRefName("sw1").getTaskId() - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) def updatedWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowInstanceId = updatedWorkflow.getTaskByRefName('sw1').subWorkflowId @@ -1079,19 +1077,15 @@ class ForkJoinSpec extends AbstractSpecification { tasks[0].taskType == 'FORK' tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == 'SUB_WORKFLOW' - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'SUB_WORKFLOW' - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.IN_PROGRESS tasks[3].taskType == 'JOIN' tasks[3].status == Task.Status.IN_PROGRESS } when: "both the sub workflows are started by issuing a system task call" def workflowWithScheduledSubWorkflows = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) - def subWorkflowTaskId1 = workflowWithScheduledSubWorkflows.getTaskByRefName('st1').taskId - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId1) - def subWorkflowTaskId2 = workflowWithScheduledSubWorkflows.getTaskByRefName('st2').taskId - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId2) def joinTaskId = workflowWithScheduledSubWorkflows.getTaskByRefName("fanouttask_join").taskId then: "verify that the sub workflow tasks are in a IN PROGRESS state" @@ -1212,7 +1206,7 @@ class ForkJoinSpec extends AbstractSpecification { tasks[0].taskType == 'FORK' tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == 'SUB_WORKFLOW' - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == 'JOIN' @@ -1224,7 +1218,6 @@ class ForkJoinSpec extends AbstractSpecification { def parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId def jointaskId = parentWorkflow.getTaskByRefName("fanouttask_join").taskId - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) then: "verify that the sub workflow task is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -1283,13 +1276,12 @@ class ForkJoinSpec extends AbstractSpecification { tasks[3].inputData['joinOn'] == ['st1', 't2'] tasks[3].status == Task.Status.IN_PROGRESS tasks[4].taskType == 'SUB_WORKFLOW' - tasks[4].status == Task.Status.SCHEDULED + tasks[4].status == Task.Status.IN_PROGRESS } when: "the sub workflow is started by issuing a system task call" parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) then: "verify that the sub workflow task is in a IN PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy index 03d077ea0..21f582307 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy @@ -87,7 +87,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == TASK_TYPE_JOIN @@ -100,10 +100,6 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { then: "verify that the 'integration_task_2' was polled and acknowledged" verifyPolledAndAcknowledgedTask(pollAndCompleteTask) - when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - List polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" def rootWorkflowInstance = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) with(rootWorkflowInstance) { @@ -119,19 +115,17 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.COMPLETED tasks[3].taskType == TASK_TYPE_JOIN tasks[3].status == Task.Status.IN_PROGRESS } - and: "poll and complete the integration_task_2 task in the mid-level workflow" + and: "poll and complete the integration_task_2 task in the root-level workflow" workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def midLevelWorkflowInstance = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true) then: "verify that the leaf workflow is RUNNING, and first task is in SCHEDULED state" @@ -221,7 +215,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == TASK_TYPE_JOIN @@ -231,11 +225,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { when: "poll and complete the integration_task_2 task in the root workflow" def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) - - and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -245,21 +235,17 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.COMPLETED tasks[3].taskType == TASK_TYPE_JOIN tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the mid-level workflow" + when: "poll and complete the integration_task_2 task in the root-level workflow" def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) - - and: "poll and execute the sub workflow task" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId @@ -323,7 +309,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == TASK_TYPE_JOIN @@ -349,11 +335,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) - - and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId From 135837ae20defa1157ab149992b4c255fd529345 Mon Sep 17 00:00:00 2001 From: manan164 <1897158+manan164@users.noreply.github.com> Date: Mon, 10 Jun 2024 19:57:36 +0530 Subject: [PATCH 7/8] fixes --- ...hicalForkJoinSubworkflowRestartSpec.groovy | 32 ++++++------------- ...rchicalForkJoinSubworkflowRetrySpec.groovy | 30 ++++++----------- .../LambdaAndTerminateTaskSpec.groovy | 4 +-- .../NestedForkJoinSubWorkflowSpec.groovy | 16 ++-------- .../integration/SubWorkflowRestartSpec.groovy | 20 +++--------- .../integration/SubWorkflowRetrySpec.groovy | 16 +++------- 6 files changed, 31 insertions(+), 87 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy index 279a48fb2..b490ba5c7 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy @@ -86,7 +86,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == TASK_TYPE_JOIN @@ -118,9 +118,9 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.COMPLETED tasks[3].taskType == TASK_TYPE_JOIN tasks[3].status == Task.Status.IN_PROGRESS } @@ -218,7 +218,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == TASK_TYPE_JOIN @@ -228,11 +228,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { when: "poll and complete the integration_task_2 task in the root workflow" def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) - - and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -242,9 +238,9 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.COMPLETED tasks[3].taskType == TASK_TYPE_JOIN tasks[3].status == Task.Status.IN_PROGRESS } @@ -252,11 +248,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { when: "poll and complete the integration_task_2 task in the mid-level workflow" def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) - - and: "poll and execute the sub workflow task" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId @@ -318,7 +310,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == TASK_TYPE_JOIN @@ -344,11 +336,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) - - and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy index 56d78bfc2..c403b3a14 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy @@ -86,7 +86,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == TASK_TYPE_JOIN @@ -118,19 +118,15 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.COMPLETED tasks[3].taskType == TASK_TYPE_JOIN tasks[3].status == Task.Status.IN_PROGRESS } and: "poll and complete the integration_task_1 task in the mid-level workflow" workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) - - when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def midLevelWorkflowInstance = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true) then: "verify that the leaf workflow is RUNNING, and first task is in SCHEDULED state" @@ -225,14 +221,12 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { tasks[3].taskType == TASK_TYPE_JOIN tasks[3].status == Task.Status.IN_PROGRESS tasks[4].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[4].status == Task.Status.SCHEDULED + tasks[4].status == Task.Status.IN_PROGRESS tasks[4].retriedTaskId == tasks[1].taskId } when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(4).subWorkflowId def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId then: "verify that a new mid level workflow is created and is in RUNNING state" @@ -243,7 +237,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { tasks[0].taskType == TASK_TYPE_FORK tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS tasks[2].taskType == 'integration_task_2' tasks[2].status == Task.Status.SCHEDULED tasks[3].taskType == TASK_TYPE_JOIN @@ -253,11 +247,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "poll and complete the integration_task_1 task in the mid-level workflow" workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - - and: "poll and execute the sub workflow task" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId @@ -326,7 +316,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { tasks[3].taskType == TASK_TYPE_JOIN tasks[3].status == Task.Status.IN_PROGRESS tasks[4].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[4].status == Task.Status.SCHEDULED + tasks[4].status == Task.Status.IN_PROGRESS tasks[4].retriedTaskId == tasks[1].taskId } @@ -346,9 +336,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { } when: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/LambdaAndTerminateTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/LambdaAndTerminateTaskSpec.groovy index fd6c6cb14..d6b37890a 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/LambdaAndTerminateTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/LambdaAndTerminateTaskSpec.groovy @@ -139,7 +139,7 @@ class LambdaAndTerminateTaskSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS tasks[3].taskType == 'JOIN' tasks[3].seq == 4 - tasks[4].status == Task.Status.SCHEDULED + tasks[4].status == Task.Status.IN_PROGRESS tasks[4].taskType == 'SUB_WORKFLOW' tasks[4].seq == 5 tasks[5].status == Task.Status.IN_PROGRESS @@ -149,8 +149,6 @@ class LambdaAndTerminateTaskSpec extends AbstractSpecification { when: "subworkflow is retrieved" def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) - def subWorkflowTaskId = workflow.getTaskByRefName("test_terminate_subworkflow").getTaskId() - asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowId = workflow.getTaskByRefName("test_terminate_subworkflow").subWorkflowId diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy index 548418531..041bfbc67 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy @@ -86,7 +86,7 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { tasks[1].taskType == TASK_TYPE_FORK tasks[1].status == Task.Status.COMPLETED tasks[2].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.IN_PROGRESS tasks[3].taskType == 'integration_task_2' tasks[3].status == Task.Status.SCHEDULED tasks[4].taskType == TASK_TYPE_JOIN @@ -101,10 +101,6 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker', ['op': 'task1.done']) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker', ['op': 'task1.done']) - and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0)) - then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" def parentWorkflowInstance = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) with(parentWorkflowInstance) { @@ -314,7 +310,7 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { tasks[1].taskType == TASK_TYPE_FORK tasks[1].status == Task.Status.COMPLETED tasks[2].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.IN_PROGRESS tasks[3].taskType == 'integration_task_2' tasks[3].status == Task.Status.SCHEDULED tasks[4].taskType == TASK_TYPE_JOIN @@ -326,8 +322,6 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { } when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0)) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def workflow = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) @@ -444,14 +438,10 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { tasks[6].taskType == TASK_TYPE_JOIN tasks[6].status == Task.Status.IN_PROGRESS tasks[7].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[7].status == Task.Status.SCHEDULED + tasks[7].status == Task.Status.IN_PROGRESS tasks[7].retriedTaskId == tasks[2].taskId } - when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0)) - then: "verify that SUB_WORKFLOW task in in progress" def parentWorkflowInstance = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) with(workflowExecutionService.getExecutionStatus(parentWorkflowId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRestartSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRestartSpec.groovy index 519d01a18..88589291d 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRestartSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRestartSpec.groovy @@ -200,13 +200,9 @@ class SubWorkflowRestartSpec extends AbstractSpecification { tasks[0].taskType == 'integration_task_1' tasks[0].status == Task.Status.COMPLETED tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.SCHEDULED + tasks[1].status == Task.Status.IN_PROGRESS } - - when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -219,11 +215,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification { when: "poll and complete the integration_task_1 task in the mid-level workflow" workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) - - and: "poll and execute the sub workflow task" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId @@ -307,11 +299,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification { when: "poll and complete the task in the mid level workflow" workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) - - and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy index 4cab35a5a..99f882323 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy @@ -198,14 +198,10 @@ class SubWorkflowRetrySpec extends AbstractSpecification { tasks[1].status == Task.Status.FAILED tasks[1].retried tasks[2].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.IN_PROGRESS tasks[2].retriedTaskId == tasks[1].taskId } - - when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -407,7 +403,7 @@ class SubWorkflowRetrySpec extends AbstractSpecification { tasks[1].status == Task.Status.FAILED tasks[1].retried tasks[2].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[2].status == Task.Status.SCHEDULED + tasks[2].status == Task.Status.IN_PROGRESS tasks[2].retriedTaskId == tasks[1].taskId } @@ -419,11 +415,7 @@ class SubWorkflowRetrySpec extends AbstractSpecification { tasks[1].status == Task.Status.IN_PROGRESS tasks[1].subworkflowChanged } - - when: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" - def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId From d481f6c55d381d321bf5b72e428079f1d08d2b88 Mon Sep 17 00:00:00 2001 From: manan164 <1897158+manan164@users.noreply.github.com> Date: Tue, 11 Jun 2024 19:40:43 +0530 Subject: [PATCH 8/8] fixes --- ...rchicalForkJoinSubworkflowRetrySpec.groovy | 6 +----- .../integration/SubWorkflowRetrySpec.groovy | 20 +++++++------------ 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy index c403b3a14..c272e8637 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy @@ -99,10 +99,6 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { then: "verify that the 'integration_task_1' was polled and acknowledged" verifyPolledAndAcknowledgedTask(pollAndCompleteTask) - when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" - List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" def rootWorkflowInstance = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) with(rootWorkflowInstance) { @@ -336,7 +332,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { } when: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" - def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(4).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy index 99f882323..17907d407 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy @@ -23,6 +23,7 @@ import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared +import static com.netflix.conductor.common.metadata.tasks.TaskType.SUB_WORKFLOW import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask @@ -201,24 +202,17 @@ class SubWorkflowRetrySpec extends AbstractSpecification { tasks[2].status == Task.Status.IN_PROGRESS tasks[2].retriedTaskId == tasks[1].taskId } - def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId + def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(2).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId with(workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true)) { status == Workflow.WorkflowStatus.RUNNING - tasks.size() == 1 - tasks[0].taskType == 'integration_task_1' - tasks[0].status == Task.Status.SCHEDULED + tasks.size() == 2 + tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW + tasks[1].status == Task.Status.IN_PROGRESS } - - when: "poll and complete the integration_task_1 task in the mid-level workflow" - workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) - - and: "poll and execute the sub workflow task" - polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) - def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId @@ -415,7 +409,7 @@ class SubWorkflowRetrySpec extends AbstractSpecification { tasks[1].status == Task.Status.IN_PROGRESS tasks[1].subworkflowChanged } - def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId + def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(2).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId