diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index 6e3c01c6..717acb5f 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -156,6 +156,7 @@ def start(self, ctx, threads): if not validated: self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=err) + return # Get the first tasks unstarted_threads = self._fetch_ready_tasks() @@ -170,7 +171,7 @@ def start(self, ctx, threads): self.state.ready_tasks += unstarted_threads except Exception as e: # Trigger the terminal state callback. - self._on_pipeline_terminal_state(event=PIPELINE_FAILED) + self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) @interceptable() def _staging(self, ctx): @@ -346,11 +347,8 @@ def _on_pipeline_terminal_state(self, event=None, message=""): if event == None: event = PIPELINE_FAILED if len(self.state.failed) > 0 else PIPELINE_COMPLETED - if event == PIPELINE_FAILED: - self.terminate() - self._deregister_all_executors() - msg = "COMPLETED" + if event == PIPELINE_FAILED: msg = "FAILED" + f" {message}" elif event == PIPELINE_TERMINATED: msg = "TERMINATED" + f" {message}" @@ -561,11 +559,6 @@ def _deregister_executor(self, run_uuid, task): # TODO use server logger below # self.state.ctx.logger.debug(self.t_str(task, "EXECUTOR DEREGISTERED")) - def _deregister_all_executors(self): - for key in self.state.executors: - self.state.executors[key].cleanup() - del self.state.executors[key] - @interceptable() def _get_executor(self, run_uuid, task): return self.state.executors[f"{run_uuid}.{task.id}"]