diff --git a/src/engine/src/core/tasks/TaskInputFileStagingService.py b/src/engine/src/core/tasks/TaskInputFileStagingService.py index 136c6592..21cf0763 100644 --- a/src/engine/src/core/tasks/TaskInputFileStagingService.py +++ b/src/engine/src/core/tasks/TaskInputFileStagingService.py @@ -41,7 +41,7 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}' | {e}") + raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}'") if key == "args": try: value = self._value_from_service.get_arg_value_by_key( @@ -49,7 +49,7 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}' | {e}") + raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}'") if key == "env": try: value = self._value_from_service.get_env_value_by_key( @@ -57,7 +57,7 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}' | {e}") + raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}'") self._create_input_(task, input_id, value) diff --git a/src/engine/src/core/workflows/WorkflowExecutor.py b/src/engine/src/core/workflows/WorkflowExecutor.py index f64c5f15..a3390729 100644 --- a/src/engine/src/core/workflows/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/WorkflowExecutor.py @@ -332,13 +332,13 @@ def _start_task(self, task): try: task_input_file_staging_service.stage(task) except TaskInputStagingError as e: - self.state.ctx.logger.info(self.t_str(task, "FAILED")) - self.publish(Event(TASK_FAILED, self.state.ctx, task=task)) # Get the next queued tasks if any unstarted_threads = self._on_task_terminal_state( task, TaskResult(1, errors=[str(e)]) ) + self.state.ctx.logger.info(self.t_str(task, "FAILED")) + self.publish(Event(TASK_FAILED, self.state.ctx, task=task)) # NOTE Triggers hook _on_change_ready_task self.state.ready_tasks += unstarted_threads