diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index fa6a882b..e8fd72a9 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -345,6 +345,9 @@ def _on_pipeline_terminal_state(self, event=None, message=""): # of failed tasks if event == None: event = PIPELINE_FAILED if len(self.state.failed) > 0 else PIPELINE_COMPLETED + + if event == PIPELINE_FAILED: + self._deregister_all_executors() msg = "COMPLETED" if event == PIPELINE_FAILED: msg = "FAILED" + f" {message}" @@ -557,6 +560,11 @@ def _deregister_executor(self, run_uuid, task): # TODO use server logger below # self.state.ctx.logger.debug(self.t_str(task, "EXECUTOR DEREGISTERED")) + def _deregister_all_executors(self): + for key in self.state.executors: + self.state.executors[key].cleanup() + del self.state.executors[key] + @interceptable() def _get_executor(self, run_uuid, task): return self.state.executors[f"{run_uuid}.{task.id}"]