Skip to content

Commit

Permalink
dereg all executors on pipeline fail
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Nov 14, 2023
1 parent 2dc76a9 commit 9097280
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ def _on_pipeline_terminal_state(self, event=None, message=""):
# of failed tasks
if event == None:
event = PIPELINE_FAILED if len(self.state.failed) > 0 else PIPELINE_COMPLETED

if event == PIPELINE_FAILED:
self._deregister_all_executors()

msg = "COMPLETED"
if event == PIPELINE_FAILED: msg = "FAILED" + f" {message}"
Expand Down Expand Up @@ -557,6 +560,11 @@ def _deregister_executor(self, run_uuid, task):
# TODO use server logger below
# self.state.ctx.logger.debug(self.t_str(task, "EXECUTOR DEREGISTERED"))

def _deregister_all_executors(self):
for key in self.state.executors:
self.state.executors[key].cleanup()
del self.state.executors[key]

@interceptable()
def _get_executor(self, run_uuid, task):
return self.state.executors[f"{run_uuid}.{task.id}"]
Expand Down

0 comments on commit 9097280

Please sign in to comment.