Skip to content

Commit

Permalink
The assertions made by these tests were wrong. We have no guarantees …
Browse files Browse the repository at this point in the history
…on which integration_task_2 will be polled and completed first.
  • Loading branch information
jmigueprieto committed Sep 11, 2024
1 parent 10a5565 commit fdc5758
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 8 deletions.
1 change: 1 addition & 0 deletions test-harness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ dependencies {
testImplementation "org.junit.vintage:junit-vintage-engine"
testImplementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}"
testImplementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}"
testImplementation "org.awaitility:awaitility:${revAwaitility}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ import com.netflix.conductor.test.base.AbstractSpecification

import spock.lang.Shared

import java.util.concurrent.TimeUnit

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_FORK
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW
import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask
import static org.awaitility.Awaitility.await

class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {

Expand Down Expand Up @@ -230,10 +233,23 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the root workflow"
when: "poll and complete integration_task_2 in root and mid level workflow"
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId
// The root workflow has an integration_task_2. Its subworkflow also has an integration_task_2.
// We have NO guarantees which will be polled and completed first, so the assertions done in previous versions of this test were wrong.
await().atMost(10, TimeUnit.SECONDS).until {
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def rootWf = workflowExecutionService.getExecutionStatus(rootWorkflowId, true)
def midWf = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true)

rootWf.status == Workflow.WorkflowStatus.RUNNING &&
rootWf.tasks[2].taskType == 'integration_task_2' &&
rootWf.tasks[2].status == Task.Status.COMPLETED &&
midWf.status == Workflow.WorkflowStatus.RUNNING &&
midWf.tasks[2].taskType == 'integration_task_2' &&
midWf.tasks[2].status == Task.Status.COMPLETED
}

then: "verify that a new mid level workflow is created and is in RUNNING state"
newMidLevelWorkflowId != midLevelWorkflowId
Expand All @@ -251,9 +267,8 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the root-level workflow"
when: "mid level workflow is in RUNNING state"
def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import com.netflix.conductor.test.base.AbstractSpecification

import spock.lang.Shared

import java.util.concurrent.TimeUnit

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_FORK
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW
import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask
import static org.awaitility.Awaitility.await

class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {

Expand Down Expand Up @@ -231,10 +234,23 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the root workflow"
when: "poll and complete integration_task_2 in root and mid level workflow"
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId
// The root workflow has an integration_task_2. Its subworkflow also has an integration_task_2.
// We have NO guarantees which will be polled and completed first, so the assertions done in previous versions of this test were wrong.
await().atMost(10, TimeUnit.SECONDS).until {
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def rootWf = workflowExecutionService.getExecutionStatus(rootWorkflowId, true)
def midWf = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true)

rootWf.status == Workflow.WorkflowStatus.RUNNING &&
rootWf.tasks[2].taskType == 'integration_task_2' &&
rootWf.tasks[2].status == Task.Status.COMPLETED &&
midWf.status == Workflow.WorkflowStatus.RUNNING &&
midWf.tasks[2].taskType == 'integration_task_2' &&
midWf.tasks[2].status == Task.Status.COMPLETED
}

then: "verify that a new mid level workflow is created and is in RUNNING state"
newMidLevelWorkflowId != midLevelWorkflowId
Expand All @@ -251,9 +267,8 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification {
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the mid-level workflow"
when: "mid level workflow is in RUNNING state"
def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
Expand Down

0 comments on commit fdc5758

Please sign in to comment.