From 4524c9cf0bc59997ccbb910532daece85f34b77e Mon Sep 17 00:00:00 2001 From: Miguel Prieto Date: Wed, 11 Sep 2024 19:37:02 -0400 Subject: [PATCH] The assertions made by these tests were wrong. We have no guarantees on which integration_task_2 will be polled and completed first. --- test-harness/build.gradle | 1 + ...rchicalForkJoinSubworkflowRerunSpec.groovy | 24 +++++++++++++++---- ...hicalForkJoinSubworkflowRestartSpec.groovy | 24 +++++++++++++++---- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/test-harness/build.gradle b/test-harness/build.gradle index db73b2706..5f8a64a59 100644 --- a/test-harness/build.gradle +++ b/test-harness/build.gradle @@ -40,4 +40,5 @@ dependencies { testImplementation "org.junit.vintage:junit-vintage-engine" testImplementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}" testImplementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}" + testImplementation "org.awaitility:awaitility:${revAwaitility}" } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy index 1a8f1dff4..8496eef94 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy @@ -12,6 +12,8 @@ */ package com.netflix.conductor.test.integration +import java.util.concurrent.TimeUnit + import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task @@ -30,6 +32,8 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOI import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask +import static org.awaitility.Awaitility.await + class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { @Shared @@ -230,10 +234,23 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the root workflow" + when: "poll and complete integration_task_2 in root and mid level workflow" def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId + // The root workflow has an integration_task_2. Its subworkflow also has an integration_task_2. + // We have NO guarantees which will be polled and completed first, so the assertions done in previous versions of this test were wrong. + await().atMost(10, TimeUnit.SECONDS).until { + workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) + def rootWf = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) + def midWf = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true) + + rootWf.status == Workflow.WorkflowStatus.RUNNING && + rootWf.tasks[2].taskType == 'integration_task_2' && + rootWf.tasks[2].status == Task.Status.COMPLETED && + midWf.status == Workflow.WorkflowStatus.RUNNING && + midWf.tasks[2].taskType == 'integration_task_2' && + midWf.tasks[2].status == Task.Status.COMPLETED + } then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -251,9 +268,8 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the root-level workflow" + when: "mid level workflow is in RUNNING state" def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy index 1d9cc0b11..c50670bf0 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy @@ -12,6 +12,8 @@ */ package com.netflix.conductor.test.integration +import java.util.concurrent.TimeUnit + import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task @@ -29,6 +31,8 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOI import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask +import static org.awaitility.Awaitility.await + class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { @Shared @@ -231,10 +235,23 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the root workflow" + when: "poll and complete integration_task_2 in root and mid level workflow" def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId + // The root workflow has an integration_task_2. Its subworkflow also has an integration_task_2. + // We have NO guarantees which will be polled and completed first, so the assertions done in previous versions of this test were wrong. + await().atMost(10, TimeUnit.SECONDS).until { + workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) + def rootWf = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) + def midWf = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true) + + rootWf.status == Workflow.WorkflowStatus.RUNNING && + rootWf.tasks[2].taskType == 'integration_task_2' && + rootWf.tasks[2].status == Task.Status.COMPLETED && + midWf.status == Workflow.WorkflowStatus.RUNNING && + midWf.tasks[2].taskType == 'integration_task_2' && + midWf.tasks[2].status == Task.Status.COMPLETED + } then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -251,9 +268,8 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the mid-level workflow" + when: "mid level workflow is in RUNNING state" def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state"