Skip to content

Commit

Permalink
move pipeline validation logic after staging step
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Nov 14, 2023
1 parent 36519e2 commit 2dc76a9
Showing 1 changed file with 9 additions and 15 deletions.
24 changes: 9 additions & 15 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ def start(self, ctx, threads):
# middleware(notification and archivers), queue the tasks
self._staging(ctx)

# Validate the submission args against the pipeline's parameters
(validated, err) = params_validator(
self.state.ctx.pipeline.params,
self.state.ctx.args
)

if not validated:
self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=err)

# Get the first tasks
unstarted_threads = self._fetch_ready_tasks()

Expand All @@ -173,22 +182,7 @@ def _staging(self, ctx):
self._set_context(ctx)

# Prepare the file system for this pipeline and handle pipeline templating
print("BEFORE PREPARE PIPELINE")
self._prepare_pipeline()
print("AFTER PREPARE PIPELINE")

# Validate the pipeline submission args against the pipeline's parameters
print("BEFORE VALIDATION")
(validated, err) = params_validator(
self.state.ctx.pipeline.params,
self.state.ctx.args
)
print("AFTER VALIDATION")

if not validated:
print("NOT VALIDATED")
self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=err)
print("AFTER NOT VALIDATIED ON PIPELINE TERMINAL STATE")

# Publish the PIPELINE_STAGING event
# NOTE Events can only be published AFTER the '_prepare_pipeline' method is called
Expand Down

0 comments on commit 2dc76a9

Please sign in to comment.