Skip to content

Commit

Permalink
Merge pull request #3 from ivakoleva/permissive-task-feature
Browse files Browse the repository at this point in the history
Permissive task capability
  • Loading branch information
v1r3n authored Dec 23, 2023
2 parents 88f6e18 + fdf5f5e commit e60b0e6
Show file tree
Hide file tree
Showing 14 changed files with 1,070 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ public void setTasks(List<WorkflowTask> tasks) {
@ProtoField(id = 28)
private String expression;

@ProtoField(id = 29)
private boolean permissive = false;

/**
* @return the name
*/
Expand Down Expand Up @@ -547,6 +550,22 @@ public void setExpression(String expression) {
this.expression = expression;
}

/**
* @return If the task is permissive. When set to true, and the task is in failed status,
* fail-fast does not occur. The workflow execution continues until reaching join or end of
* workflow, allowing idempotent execution of other tasks.
*/
public boolean isPermissive() {
return this.permissive;
}

/**
* @param permissive when set to true, the task is marked as permissive
*/
public void setPermissive(boolean permissive) {
this.permissive = permissive;
}

private Collection<List<WorkflowTask>> children() {
Collection<List<WorkflowTask>> workflowTaskLists = new LinkedList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get());
executedTaskRefNames.remove(retryTask.get().getReferenceTaskName());
outcome.tasksToBeUpdated.add(pendingTask);
} else {
} else if (!(pendingTask.getWorkflowTask() != null
&& pendingTask.getWorkflowTask().isPermissive()
&& !pendingTask.getWorkflowTask().isOptional())) {
pendingTask.setStatus(COMPLETED_WITH_ERRORS);
}
}
Expand Down Expand Up @@ -254,6 +256,39 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
if (hasSuccessfulTerminateTask
|| (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) {
LOGGER.debug("Marking workflow: {} as complete.", workflow);
List<TaskModel> permissiveTasksTerminalNonSuccessful =
workflow.getTasks().stream()
.filter(t -> t.getWorkflowTask() != null)
.filter(t -> t.getWorkflowTask().isPermissive())
.filter(t -> !t.getWorkflowTask().isOptional())
.collect(
Collectors.toMap(
TaskModel::getReferenceTaskName,
t -> t,
(t1, t2) ->
t1.getRetryCount() > t2.getRetryCount()
? t1
: t2))
.values()
.stream()
.filter(
t ->
t.getStatus().isTerminal()
&& !t.getStatus().isSuccessful())
.toList();
if (!permissiveTasksTerminalNonSuccessful.isEmpty()) {
final String errMsg =
permissiveTasksTerminalNonSuccessful.stream()
.map(
t ->
String.format(
"Task %s failed with status: %s and reason: '%s'",
t.getTaskId(),
t.getStatus(),
t.getReasonForIncompletion()))
.collect(Collectors.joining(". "));
throw new TerminateWorkflowException(errMsg);
}
outcome.isComplete = true;
}

Expand Down Expand Up @@ -437,11 +472,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow)
if (status == null || !status.isTerminal()) {
return false;
}
// if we reach here, the task has been completed.
// Was the task successful in completion?
if (!status.isSuccessful()) {
return false;
}
}

boolean noPendingSchedule =
Expand Down Expand Up @@ -529,7 +559,8 @@ Optional<TaskModel> retry(
if (!task.getStatus().isRetriable()
|| TaskType.isBuiltIn(task.getTaskType())
|| expectedRetryCount <= retryCount) {
if (workflowTask != null && workflowTask.isOptional()) {
if (workflowTask != null
&& (workflowTask.isOptional() || workflowTask.isPermissive())) {
return Optional.empty();
}
WorkflowModel.Status status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,18 @@ private void retry(WorkflowModel workflow) {
for (TaskModel task : workflow.getTasks()) {
switch (task.getStatus()) {
case FAILED:
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())
|| task.getTaskType()
.equalsIgnoreCase(TaskType.EXCLUSIVE_JOIN.toString())) {
@SuppressWarnings("unchecked")
List<String> joinOn = (List<String>) task.getInputData().get("joinOn");
boolean joinOnFailedPermissive = isJoinOnFailedPermissive(joinOn, workflow);
if (joinOnFailedPermissive) {
task.setStatus(IN_PROGRESS);
addTaskToQueue(task);
break;
}
}
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
retriableMap.put(task.getReferenceTaskName(), task);
Expand Down Expand Up @@ -1814,4 +1826,14 @@ private void expediteLazyWorkflowEvaluation(String workflowId) {

LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE);
}

private static boolean isJoinOnFailedPermissive(List<String> joinOn, WorkflowModel workflow) {
return joinOn.stream()
.map(workflow::getTaskByRefName)
.anyMatch(
t ->
t.getWorkflowTask().isPermissive()
&& !t.getWorkflowTask().isOptional()
&& t.getStatus().equals(FAILED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,20 @@ public boolean execute(
}
taskStatus = exclusiveTask.getStatus();
foundExlusiveJoinOnTask = taskStatus.isTerminal();
hasFailures = !taskStatus.isSuccessful();
hasFailures =
!taskStatus.isSuccessful()
&& (!exclusiveTask.getWorkflowTask().isPermissive()
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
if (hasFailures) {
failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" ");
final String failureReasons =
joinOn.stream()
.map(workflow::getTaskByRefName)
.filter(t -> !t.getStatus().isSuccessful())
.map(TaskModel::getReasonForIncompletion)
.collect(Collectors.joining(" "));
failureReason.append(failureReasons);
}

break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,21 @@ public boolean execute(
break;
}
TaskModel.Status taskStatus = forkedTask.getStatus();
hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional();
hasFailures =
!taskStatus.isSuccessful()
&& !forkedTask.getWorkflowTask().isOptional()
&& (!forkedTask.getWorkflowTask().isPermissive()
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
if (hasFailures) {
failureReason.append(forkedTask.getReasonForIncompletion()).append(" ");
final String failureReasons =
joinOn.stream()
.map(workflow::getTaskByRefName)
.filter(t -> !t.getStatus().isSuccessful())
.map(TaskModel::getReasonForIncompletion)
.collect(Collectors.joining(" "));
failureReason.append(failureReasons);
}
// Only add to task output if it's not empty
if (!forkedTask.getOutputData().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,79 @@ public void testOptional() {
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
}

@Test
public void testPermissive() {
WorkflowDef def = new WorkflowDef();
def.setName("test-permissive");

WorkflowTask task1 = new WorkflowTask();
task1.setName("task0");
task1.setPermissive(true);
task1.setTaskReferenceName("t0");
task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
task1.setTaskDefinition(new TaskDef("task0"));

WorkflowTask task2 = new WorkflowTask();
task2.setName("task1");
task2.setPermissive(true);
task2.setTaskReferenceName("t1");
task2.setTaskDefinition(new TaskDef("task1"));

def.getTasks().add(task1);
def.getTasks().add(task2);
def.setSchemaVersion(2);

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.setCreateTime(System.currentTimeMillis());
DeciderOutcome outcome = deciderService.decide(workflow);
assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeScheduled.size());
assertEquals(
task1.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());

for (int i = 0; i < 3; i++) {
String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();
assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId"));

workflow.getTasks().clear();
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);

outcome = deciderService.decide(workflow);

assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeUpdated.size());
assertEquals(1, outcome.tasksToBeScheduled.size());

assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
assertEquals(
task1.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount());
}

String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();

workflow.getTasks().clear();
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);

outcome = deciderService.decide(workflow);

assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeUpdated.size());
assertEquals(1, outcome.tasksToBeScheduled.size());

assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
assertEquals(
task2.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
}

@Test
public void testOptionalWithDynamicFork() {
WorkflowDef def = new WorkflowDef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,7 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) {
if (from.getExpression() != null) {
to.setExpression( from.getExpression() );
}
to.setPermissive( from.isPermissive() );
return to.build();
}

Expand Down Expand Up @@ -1396,6 +1397,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) {
to.setRetryCount( from.getRetryCount() );
to.setEvaluatorType( from.getEvaluatorType() );
to.setExpression( from.getExpression() );
to.setPermissive( from.getPermissive() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/workflowtask.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ message WorkflowTask {
int32 retry_count = 26;
string evaluator_type = 27;
string expression = 28;
bool permissive = 29;
}
Loading

0 comments on commit e60b0e6

Please sign in to comment.