diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index e8fa65be..fa6a882b 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -148,6 +148,15 @@ def start(self, ctx, threads): # middleware(notification and archivers), queue the tasks self._staging(ctx) + # Validate the submission args against the pipeline's parameters + (validated, err) = params_validator( + self.state.ctx.pipeline.params, + self.state.ctx.args + ) + + if not validated: + self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=err) + # Get the first tasks unstarted_threads = self._fetch_ready_tasks() @@ -173,22 +182,7 @@ def _staging(self, ctx): self._set_context(ctx) # Prepare the file system for this pipeline and handle pipeline templating - print("BEFORE PREPARE PIPELINE") self._prepare_pipeline() - print("AFTER PREPARE PIPELINE") - - # Validate the pipeline submission args against the pipeline's parameters - print("BEFORE VALIDATION") - (validated, err) = params_validator( - self.state.ctx.pipeline.params, - self.state.ctx.args - ) - print("AFTER VALIDATION") - - if not validated: - print("NOT VALIDATED") - self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=err) - print("AFTER NOT VALIDATIED ON PIPELINE TERMINAL STATE") # Publish the PIPELINE_STAGING event # NOTE Events can only be published AFTER the '_prepare_pipeline' method is called