Skip to content

Commit

Permalink
Merge pull request #257 from conductor-oss/fix/flaky-rerun-restart-tests
Browse files Browse the repository at this point in the history
Fix flaky tests `HierarchicalForkJoinSubworkflowRerunSpec` & `HierarchicalForkJoinSubworkflowRestartSpec`
  • Loading branch information
jmigueprieto authored Sep 12, 2024
2 parents 10a5565 + 4524c9c commit 927e93d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
1 change: 1 addition & 0 deletions test-harness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ dependencies {
testImplementation "org.junit.vintage:junit-vintage-engine"
testImplementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}"
testImplementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}"
testImplementation "org.awaitility:awaitility:${revAwaitility}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package com.netflix.conductor.test.integration

import java.util.concurrent.TimeUnit

import org.springframework.beans.factory.annotation.Autowired

import com.netflix.conductor.common.metadata.tasks.Task
Expand All @@ -30,6 +32,8 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOI
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW
import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask

import static org.awaitility.Awaitility.await

class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {

@Shared
Expand Down Expand Up @@ -230,10 +234,23 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the root workflow"
when: "poll and complete integration_task_2 in root and mid level workflow"
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId
// The root workflow has an integration_task_2. Its subworkflow also has an integration_task_2.
// We have NO guarantees which will be polled and completed first, so the assertions done in previous versions of this test were wrong.
await().atMost(10, TimeUnit.SECONDS).until {
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def rootWf = workflowExecutionService.getExecutionStatus(rootWorkflowId, true)
def midWf = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true)

rootWf.status == Workflow.WorkflowStatus.RUNNING &&
rootWf.tasks[2].taskType == 'integration_task_2' &&
rootWf.tasks[2].status == Task.Status.COMPLETED &&
midWf.status == Workflow.WorkflowStatus.RUNNING &&
midWf.tasks[2].taskType == 'integration_task_2' &&
midWf.tasks[2].status == Task.Status.COMPLETED
}

then: "verify that a new mid level workflow is created and is in RUNNING state"
newMidLevelWorkflowId != midLevelWorkflowId
Expand All @@ -251,9 +268,8 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the root-level workflow"
when: "mid level workflow is in RUNNING state"
def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package com.netflix.conductor.test.integration

import java.util.concurrent.TimeUnit

import org.springframework.beans.factory.annotation.Autowired

import com.netflix.conductor.common.metadata.tasks.Task
Expand All @@ -29,6 +31,8 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOI
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW
import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask

import static org.awaitility.Awaitility.await

class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {

@Shared
Expand Down Expand Up @@ -231,10 +235,23 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the root workflow"
when: "poll and complete integration_task_2 in root and mid level workflow"
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId
// The root workflow has an integration_task_2. Its subworkflow also has an integration_task_2.
// We have NO guarantees which will be polled and completed first, so the assertions done in previous versions of this test were wrong.
await().atMost(10, TimeUnit.SECONDS).until {
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def rootWf = workflowExecutionService.getExecutionStatus(rootWorkflowId, true)
def midWf = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true)

rootWf.status == Workflow.WorkflowStatus.RUNNING &&
rootWf.tasks[2].taskType == 'integration_task_2' &&
rootWf.tasks[2].status == Task.Status.COMPLETED &&
midWf.status == Workflow.WorkflowStatus.RUNNING &&
midWf.tasks[2].taskType == 'integration_task_2' &&
midWf.tasks[2].status == Task.Status.COMPLETED
}

then: "verify that a new mid level workflow is created and is in RUNNING state"
newMidLevelWorkflowId != midLevelWorkflowId
Expand All @@ -251,9 +268,8 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the mid-level workflow"
when: "mid level workflow is in RUNNING state"
def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
Expand Down

0 comments on commit 927e93d

Please sign in to comment.