Skip to content

Commit

Permalink
return after calling on pipeline terminal state hook
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Nov 14, 2023
1 parent ea82a2d commit 90e62c4
Showing 1 changed file with 3 additions and 10 deletions.
13 changes: 3 additions & 10 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def start(self, ctx, threads):

if not validated:
self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=err)
return

# Get the first tasks
unstarted_threads = self._fetch_ready_tasks()
Expand All @@ -170,7 +171,7 @@ def start(self, ctx, threads):
self.state.ready_tasks += unstarted_threads
except Exception as e:
# Trigger the terminal state callback.
self._on_pipeline_terminal_state(event=PIPELINE_FAILED)
self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e))

@interceptable()
def _staging(self, ctx):
Expand Down Expand Up @@ -346,11 +347,8 @@ def _on_pipeline_terminal_state(self, event=None, message=""):
if event == None:
event = PIPELINE_FAILED if len(self.state.failed) > 0 else PIPELINE_COMPLETED

if event == PIPELINE_FAILED:
self.terminate()
self._deregister_all_executors()

msg = "COMPLETED"

if event == PIPELINE_FAILED: msg = "FAILED" + f" {message}"
elif event == PIPELINE_TERMINATED: msg = "TERMINATED" + f" {message}"

Expand Down Expand Up @@ -561,11 +559,6 @@ def _deregister_executor(self, run_uuid, task):
# TODO use server logger below
# self.state.ctx.logger.debug(self.t_str(task, "EXECUTOR DEREGISTERED"))

def _deregister_all_executors(self):
for key in self.state.executors:
self.state.executors[key].cleanup()
del self.state.executors[key]

@interceptable()
def _get_executor(self, run_uuid, task):
return self.state.executors[f"{run_uuid}.{task.id}"]
Expand Down

0 comments on commit 90e62c4

Please sign in to comment.