Skip to content

Commit

Permalink
add logic for continuing the pipeline execution when failable tasks fail
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Nov 14, 2023
1 parent 90e62c4 commit ac01ffb
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,12 @@ def _on_task_terminal_state(self, task, task_result):
self._deregister_executor(self.state.ctx.pipeline_run.uuid, task)

# Run the on_pipeline_terminal_state callback if all tasks are complete.
if (
len(self.state.tasks) == len(self.state.finished)
or task_result.status != 0
):
self._on_pipeline_terminal_state()
if len(self.state.tasks) == len(self.state.finished):
self._on_pipeline_terminal_state(event=PIPELINE_COMPLETED)
return []

if task_result.status != 0 and task.can_fail == False:
self._on_pipeline_terminal_state(event=PIPELINE_FAILED)
return []

# Execute all possible queued tasks
Expand Down Expand Up @@ -429,10 +430,15 @@ def _set_tasks(self, tasks):
# Build a mapping between each task and the tasks that depend on them.
# Doing this here saves us from having to perform the dependency
# look-ups when queueing tasks, improving performance
self.state.dependency_graph = {task.id: [] for task in self.state.tasks}
# self.state.dependency_graph = {task.id: [] for task in self.state.tasks}
self.state.dependency_graph = {}

for task in self.state.tasks:
self.state.dependency_graph = {task.id: []}
task.can_fail = True if len(task.depends_on) > 0 else False
for parent_task in task.depends_on:
if parent_task.can_fail == False:
task.can_fail == False
self.state.dependency_graph[parent_task.id].append(task.id)

# Detect loops in the graph
Expand Down Expand Up @@ -532,8 +538,10 @@ def _task_is_ready(self, task):
# All tasks without dependencies are ready immediately
if len(task.depends_on) == 0: return True

# We check the failed tasks list because some tasks are permitted to
# fail if all of their dependencies specify can_fail = True
for dep in task.depends_on:
if dep.id not in self.state.finished:
if dep.id not in self.state.finished or dep.id not in self.state.failed:
return False

return True
Expand Down

0 comments on commit ac01ffb

Please sign in to comment.