Skip to content

Commit

Permalink
Fix flaky test which is failing because integration_task_2 is not COM…
Browse files Browse the repository at this point in the history
…PLETED immediately

 tasks[2].status == Task.Status.COMPLETED
             |      |       |
             |      false   class com.netflix.conductor.common.metadata.tasks.Task$Status
             SCHEDULED
  • Loading branch information
jmigueprieto committed Sep 11, 2024
1 parent e72500c commit 75763cc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 34 deletions.
1 change: 1 addition & 0 deletions test-harness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ dependencies {
testImplementation "org.junit.vintage:junit-vintage-engine"
testImplementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}"
testImplementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}"
testImplementation "org.awaitility:awaitility:${revAwaitility}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package com.netflix.conductor.test.integration

import java.util.concurrent.TimeUnit

import org.springframework.beans.factory.annotation.Autowired

import com.netflix.conductor.common.metadata.tasks.Task
Expand All @@ -30,6 +32,8 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOI
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW
import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask

import static org.awaitility.Awaitility.await

class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {

@Shared
Expand Down Expand Up @@ -217,17 +221,18 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
workflowExecutor.rerun(reRunWorkflowRequest)

then: "verify that the root workflow created a new execution"
with(workflowExecutionService.getExecutionStatus(rootWorkflowId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 4
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
await().atMost(10, TimeUnit.SECONDS).until {
def executionStatus = workflowExecutionService.getExecutionStatus(rootWorkflowId, true)
executionStatus.status == Workflow.WorkflowStatus.RUNNING &&
executionStatus.tasks.size() == 4 &&
executionStatus.tasks[0].taskType == TASK_TYPE_FORK &&
executionStatus.tasks[0].status == Task.Status.COMPLETED &&
executionStatus.tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW &&
executionStatus.tasks[1].status == Task.Status.IN_PROGRESS &&
executionStatus.tasks[2].taskType == 'integration_task_2' &&
executionStatus.tasks[2].status == Task.Status.SCHEDULED &&
executionStatus.tasks[3].taskType == TASK_TYPE_JOIN &&
executionStatus.tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the root workflow"
Expand All @@ -238,17 +243,18 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
then: "verify that a new mid level workflow is created and is in RUNNING state"
newMidLevelWorkflowId != midLevelWorkflowId
workflowExecutor.decide(newMidLevelWorkflowId)
with(workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 4
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
await().atMost(10, TimeUnit.SECONDS).until {
def executionStatus = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true)
executionStatus.status == Workflow.WorkflowStatus.RUNNING &&
executionStatus.tasks.size() == 4 &&
executionStatus.tasks[0].taskType == TASK_TYPE_FORK &&
executionStatus.tasks[0].status == Task.Status.COMPLETED &&
executionStatus.tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW &&
executionStatus.tasks[1].status == Task.Status.IN_PROGRESS &&
executionStatus.tasks[2].taskType == 'integration_task_2' &&
executionStatus.tasks[2].status == Task.Status.COMPLETED &&
executionStatus.tasks[3].taskType == TASK_TYPE_JOIN &&
executionStatus.tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the root-level workflow"
Expand All @@ -258,25 +264,27 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
with(workflowExecutionService.getExecutionStatus(newLeafWorkflowId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
await().atMost(10, TimeUnit.SECONDS).until {
def executionStatus = workflowExecutionService.getExecutionStatus(newLeafWorkflowId, true)
executionStatus.status == Workflow.WorkflowStatus.RUNNING &&
executionStatus.tasks.size() == 1 &&
executionStatus.tasks[0].taskType == 'integration_task_1' &&
executionStatus.tasks[0].status == Task.Status.SCHEDULED
}

when: "poll and complete the two tasks in the leaf workflow"
workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done'])
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

then: "the new leaf workflow is in COMPLETED state"
with(workflowExecutionService.getExecutionStatus(newLeafWorkflowId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 2
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'integration_task_2'
tasks[1].status == Task.Status.COMPLETED
await().atMost(10, TimeUnit.SECONDS).until {
def executionStatus = workflowExecutionService.getExecutionStatus(newLeafWorkflowId, true)
executionStatus.status == Workflow.WorkflowStatus.COMPLETED &&
executionStatus.tasks.size() == 2 &&
executionStatus.tasks[0].taskType == 'integration_task_1' &&
executionStatus.tasks[0].status == Task.Status.COMPLETED &&
executionStatus.tasks[1].taskType == 'integration_task_2' &&
executionStatus.tasks[1].status == Task.Status.COMPLETED
}

when: "the new mid level and root workflows are 'decided'"
Expand Down

0 comments on commit 75763cc

Please sign in to comment.