Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
manan164 committed Jun 10, 2024
1 parent d4d1bb3 commit 135837a
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
Expand Down Expand Up @@ -118,9 +118,9 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
}
Expand Down Expand Up @@ -218,7 +218,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
Expand All @@ -228,11 +228,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
when: "poll and complete the integration_task_2 task in the root workflow"
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new mid level workflow is created and is in RUNNING state"
newMidLevelWorkflowId != midLevelWorkflowId
Expand All @@ -242,21 +238,17 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the mid-level workflow"
def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

and: "poll and execute the sub workflow task"
polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
Expand Down Expand Up @@ -318,7 +310,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
Expand All @@ -344,11 +336,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call"
def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
Expand Down Expand Up @@ -118,19 +118,15 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
}

and: "poll and complete the integration_task_1 task in the mid-level workflow"
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def midLevelWorkflowInstance = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true)

then: "verify that the leaf workflow is RUNNING, and first task is in SCHEDULED state"
Expand Down Expand Up @@ -225,14 +221,12 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification {
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
tasks[4].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[4].status == Task.Status.SCHEDULED
tasks[4].status == Task.Status.IN_PROGRESS
tasks[4].retriedTaskId == tasks[1].taskId
}

when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(4).subWorkflowId
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId

then: "verify that a new mid level workflow is created and is in RUNNING state"
Expand All @@ -243,7 +237,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
Expand All @@ -253,11 +247,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification {
when: "poll and complete the integration_task_1 task in the mid-level workflow"
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId

and: "poll and execute the sub workflow task"
polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
Expand Down Expand Up @@ -326,7 +316,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification {
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
tasks[4].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[4].status == Task.Status.SCHEDULED
tasks[4].status == Task.Status.IN_PROGRESS
tasks[4].retriedTaskId == tasks[1].taskId
}

Expand All @@ -346,9 +336,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification {
}

when: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call"
def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class LambdaAndTerminateTaskSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'JOIN'
tasks[3].seq == 4
tasks[4].status == Task.Status.SCHEDULED
tasks[4].status == Task.Status.IN_PROGRESS
tasks[4].taskType == 'SUB_WORKFLOW'
tasks[4].seq == 5
tasks[5].status == Task.Status.IN_PROGRESS
Expand All @@ -149,8 +149,6 @@ class LambdaAndTerminateTaskSpec extends AbstractSpecification {

when: "subworkflow is retrieved"
def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId = workflow.getTaskByRefName("test_terminate_subworkflow").getTaskId()
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId)
workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowId = workflow.getTaskByRefName("test_terminate_subworkflow").subWorkflowId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification {
tasks[1].taskType == TASK_TYPE_FORK
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'integration_task_2'
tasks[3].status == Task.Status.SCHEDULED
tasks[4].taskType == TASK_TYPE_JOIN
Expand All @@ -101,10 +101,6 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification {
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker', ['op': 'task1.done'])
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker', ['op': 'task1.done'])

and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
List<String> polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0))

then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state"
def parentWorkflowInstance = workflowExecutionService.getExecutionStatus(parentWorkflowId, true)
with(parentWorkflowInstance) {
Expand Down Expand Up @@ -314,7 +310,7 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification {
tasks[1].taskType == TASK_TYPE_FORK
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'integration_task_2'
tasks[3].status == Task.Status.SCHEDULED
tasks[4].taskType == TASK_TYPE_JOIN
Expand All @@ -326,8 +322,6 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification {
}

when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
List<String> polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0))
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def workflow = workflowExecutionService.getExecutionStatus(parentWorkflowId, true)
Expand Down Expand Up @@ -444,14 +438,10 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification {
tasks[6].taskType == TASK_TYPE_JOIN
tasks[6].status == Task.Status.IN_PROGRESS
tasks[7].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[7].status == Task.Status.SCHEDULED
tasks[7].status == Task.Status.IN_PROGRESS
tasks[7].retriedTaskId == tasks[2].taskId
}

when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
List<String> polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0))

then: "verify that SUB_WORKFLOW task in in progress"
def parentWorkflowInstance = workflowExecutionService.getExecutionStatus(parentWorkflowId, true)
with(workflowExecutionService.getExecutionStatus(parentWorkflowId, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,9 @@ class SubWorkflowRestartSpec extends AbstractSpecification {
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
}

when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new mid level workflow is created and is in RUNNING state"
newMidLevelWorkflowId != midLevelWorkflowId
Expand All @@ -219,11 +215,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification {

when: "poll and complete the integration_task_1 task in the mid-level workflow"
workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done'])

and: "poll and execute the sub workflow task"
polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
Expand Down Expand Up @@ -307,11 +299,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification {

when: "poll and complete the task in the mid level workflow"
workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done'])

and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call"
def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
Expand Down
Loading

0 comments on commit 135837a

Please sign in to comment.