Skip to content

Commit

Permalink
Join task fixes (#117)
Browse files Browse the repository at this point in the history
* fix join completing with errors when a task has failed or some tasks are not terminal yet

* add more tests

* add more comments to join routine
  • Loading branch information
VerstraeteBert authored Apr 5, 2024
1 parent ff040fe commit 0f649b5
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.core.execution.tasks;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

Expand All @@ -36,9 +37,6 @@ public Join() {
@SuppressWarnings("unchecked")
public boolean execute(
WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {

boolean allDone = true;
boolean hasFailures = false;
StringBuilder failureReason = new StringBuilder();
StringBuilder optionalTaskFailures = new StringBuilder();
List<String> joinOn = (List<String>) task.getInputData().get("joinOn");
Expand All @@ -47,41 +45,47 @@ public boolean execute(
joinOn =
joinOn.stream()
.map(name -> TaskUtils.appendIteration(name, task.getIteration()))
.collect(Collectors.toList());
.toList();
}

boolean allTasksTerminal =
joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t != null && t.getStatus().isTerminal());

for (String joinOnRef : joinOn) {
TaskModel forkedTask = workflow.getTaskByRefName(joinOnRef);
if (forkedTask == null) {
// Task is not even scheduled yet
allDone = false;
break;
// Continue checking other tasks if a referenced task is not yet scheduled
continue;
}

TaskModel.Status taskStatus = forkedTask.getStatus();
hasFailures =

// Only add to task output if it's not empty
if (!forkedTask.getOutputData().isEmpty()) {
task.addOutput(joinOnRef, forkedTask.getOutputData());
}

// Determine if the join task fails immediately due to a non-optional, non-permissive
// task failure,
// or waits for all tasks to be terminal if the failed task is permissive.
var isJoinFailure =
!taskStatus.isSuccessful()
&& !forkedTask.getWorkflowTask().isOptional()
&& (!forkedTask.getWorkflowTask().isPermissive()
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
if (hasFailures) {
&& (!forkedTask.getWorkflowTask().isPermissive() || allTasksTerminal);
if (isJoinFailure) {
final String failureReasons =
joinOn.stream()
.map(workflow::getTaskByRefName)
.filter(Objects::nonNull)
.filter(t -> !t.getStatus().isSuccessful())
.map(TaskModel::getReasonForIncompletion)
.collect(Collectors.joining(" "));
failureReason.append(failureReasons);
}
// Only add to task output if it's not empty
if (!forkedTask.getOutputData().isEmpty()) {
task.addOutput(joinOnRef, forkedTask.getOutputData());
}
if (!taskStatus.isTerminal()) {
allDone = false;
}
if (hasFailures) {
break;
task.setReasonForIncompletion(failureReason.toString());
task.setStatus(TaskModel.Status.FAILED);
return true;
}

// check for optional task failures
Expand All @@ -95,11 +99,10 @@ public boolean execute(
.append(" ");
}
}
if (allDone || hasFailures || optionalTaskFailures.length() > 0) {
if (hasFailures) {
task.setReasonForIncompletion(failureReason.toString());
task.setStatus(TaskModel.Status.FAILED);
} else if (optionalTaskFailures.length() > 0) {

// Finalize the join task's status based on the outcomes of all referenced tasks.
if (allTasksTerminal) {
if (!optionalTaskFailures.isEmpty()) {
task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS);
optionalTaskFailures.append("completed with errors");
task.setReasonForIncompletion(optionalTaskFailures.toString());
Expand All @@ -108,6 +111,8 @@ public boolean execute(
}
return true;
}

// Task execution not complete, waiting on more tasks to reach terminal state.
return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;

import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;

public class TestJoin {
private final WorkflowExecutor executor = mock(WorkflowExecutor.class);

private TaskModel createTask(
String referenceName,
TaskModel.Status status,
boolean isOptional,
boolean isPermissive) {
TaskModel task = new TaskModel();
task.setStatus(status);
task.setReferenceTaskName(referenceName);
WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setOptional(isOptional);
workflowTask.setPermissive(isPermissive);
task.setWorkflowTask(workflowTask);
return task;
}

private Pair<WorkflowModel, TaskModel> createJoinWorkflow(
List<TaskModel> tasks, String... extraTaskRefNames) {
WorkflowModel workflow = new WorkflowModel();
var join = new TaskModel();
join.setReferenceTaskName("join");
var taskRefNames =
tasks.stream().map(TaskModel::getReferenceTaskName).collect(Collectors.toList());
taskRefNames.addAll(List.of(extraTaskRefNames));
join.getInputData().put("joinOn", taskRefNames);
workflow.getTasks().addAll(tasks);
workflow.getTasks().add(join);
return Pair.of(workflow, join);
}

@Test
public void testShouldNotMarkJoinAsCompletedWithErrorsWhenNotDone() {
var task1 = createTask("task1", TaskModel.Status.COMPLETED_WITH_ERRORS, true, false);

// task2 is not scheduled yet, so the join is not completed
var wfJoinPair = createJoinWorkflow(List.of(task1), "task2");

var join = new Join();
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse(result);
}

@Test
public void testJoinCompletesSuccessfullyWhenAllTasksSucceed() {
var task1 = createTask("task1", TaskModel.Status.COMPLETED, false, false);
var task2 = createTask("task2", TaskModel.Status.COMPLETED, false, false);

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should execute successfully when all tasks succeed", result);
assertEquals(
"Join task status should be COMPLETED when all tasks succeed",
TaskModel.Status.COMPLETED,
wfJoinPair.getRight().getStatus());
}

@Test
public void testJoinWaitsWhenAnyTaskIsNotTerminal() {
var task1 = createTask("task1", TaskModel.Status.IN_PROGRESS, false, false);
var task2 = createTask("task2", TaskModel.Status.COMPLETED, false, false);

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse("Join task should wait when any task is not in terminal state", result);
}

@Test
public void testJoinFailsWhenMandatoryTaskFails() {
// Mandatory task fails
var task1 = createTask("task1", TaskModel.Status.FAILED, false, false);
// Optional task completes with errors
var task2 = createTask("task2", TaskModel.Status.COMPLETED_WITH_ERRORS, true, false);

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when a mandatory task fails", result);
assertEquals(
"Join task status should be FAILED when a mandatory task fails",
TaskModel.Status.FAILED,
wfJoinPair.getRight().getStatus());
}

@Test
public void testJoinCompletesWithErrorsWhenOnlyOptionalTasksFail() {
// Mandatory task succeeds
var task1 = createTask("task1", TaskModel.Status.COMPLETED, false, false);
// Optional task completes with errors
var task2 = createTask("task2", TaskModel.Status.COMPLETED_WITH_ERRORS, true, false);

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when only optional tasks fail", result);
assertEquals(
"Join task status should be COMPLETED_WITH_ERRORS when only optional tasks fail",
TaskModel.Status.COMPLETED_WITH_ERRORS,
wfJoinPair.getRight().getStatus());
}

@Test
public void testJoinAggregatesFailureReasonsCorrectly() {
var task1 = createTask("task1", TaskModel.Status.FAILED, false, false);
task1.setReasonForIncompletion("Task1 failed");
var task2 = createTask("task2", TaskModel.Status.FAILED, false, false);
task2.setReasonForIncompletion("Task2 failed");

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when tasks fail", result);
assertEquals(
"Join task status should be FAILED when tasks fail",
TaskModel.Status.FAILED,
wfJoinPair.getRight().getStatus());
assertTrue(
"Join task reason for incompletion should aggregate failure reasons",
wfJoinPair.getRight().getReasonForIncompletion().contains("Task1 failed")
&& wfJoinPair
.getRight()
.getReasonForIncompletion()
.contains("Task2 failed"));
}

@Test
public void testJoinWaitsForAllTasksBeforeFailingDueToPermissiveTaskFailure() {
// Task 1 is a permissive task that fails.
var task1 = createTask("task1", TaskModel.Status.FAILED, false, true);
// Task 2 is a non-permissive task that eventually succeeds.
var task2 =
createTask(
"task2",
TaskModel.Status.IN_PROGRESS,
false,
false); // Initially not in a terminal state.

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

// First execution: Task 2 is not yet terminal.
var join = new Join();
boolean result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse("Join task should wait as not all tasks are terminal", result);

// Simulate Task 2 reaching a terminal state.
task2.setStatus(TaskModel.Status.COMPLETED);

// Second execution: Now all tasks are terminal.
result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should proceed as now all tasks are terminal", result);
assertEquals(
"Join task should be marked as FAILED due to permissive task failure",
TaskModel.Status.FAILED,
wfJoinPair.getRight().getStatus());
}
}

0 comments on commit 0f649b5

Please sign in to comment.