diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index 717acb5f..6975351f 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -330,11 +330,12 @@ def _on_task_terminal_state(self, task, task_result): self._deregister_executor(self.state.ctx.pipeline_run.uuid, task) # Run the on_pipeline_terminal_state callback if all tasks are complete. - if ( - len(self.state.tasks) == len(self.state.finished) - or task_result.status != 0 - ): - self._on_pipeline_terminal_state() + if len(self.state.tasks) == len(self.state.finished): + self._on_pipeline_terminal_state(event=PIPELINE_COMPLETED) + return [] + + if task_result.status != 0 and task.can_fail == False: + self._on_pipeline_terminal_state(event=PIPELINE_FAILED) return [] # Execute all possible queued tasks @@ -429,10 +430,15 @@ def _set_tasks(self, tasks): # Build a mapping between each task and the tasks that depend on them. # Doing this here saves us from having to perform the dependency # look-ups when queueing tasks, improving performance - self.state.dependency_graph = {task.id: [] for task in self.state.tasks} + # self.state.dependency_graph = {task.id: [] for task in self.state.tasks} + self.state.dependency_graph = {} for task in self.state.tasks: + self.state.dependency_graph = {task.id: []} + task.can_fail = True if len(task.depends_on) > 0 else False for parent_task in task.depends_on: + if parent_task.can_fail == False: + task.can_fail == False self.state.dependency_graph[parent_task.id].append(task.id) # Detect loops in the graph @@ -532,8 +538,10 @@ def _task_is_ready(self, task): # All tasks without dependencies are ready immediately if len(task.depends_on) == 0: return True + # We check the failed tasks list because some tasks are permitted to + # fail if all of their dependencies specify can_fail = True for dep in task.depends_on: - if dep.id not in self.state.finished: + if dep.id not in self.state.finished or dep.id not in self.state.failed: return False return True