From c679dcf6f9655bdb404aceac450a182fb69e5ba0 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Mon, 19 Feb 2024 14:49:12 -0600 Subject: [PATCH] Fix fail on task staging error. Remove details about file location from error --- src/engine/src/core/tasks/TaskInputFileStagingService.py | 6 +++--- src/engine/src/core/workflows/WorkflowExecutor.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/engine/src/core/tasks/TaskInputFileStagingService.py b/src/engine/src/core/tasks/TaskInputFileStagingService.py index 136c6592..21cf0763 100644 --- a/src/engine/src/core/tasks/TaskInputFileStagingService.py +++ b/src/engine/src/core/tasks/TaskInputFileStagingService.py @@ -41,7 +41,7 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}' | {e}") + raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}'") if key == "args": try: value = self._value_from_service.get_arg_value_by_key( @@ -49,7 +49,7 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}' | {e}") + raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}'") if key == "env": try: value = self._value_from_service.get_env_value_by_key( @@ -57,7 +57,7 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}' | {e}") + raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}'") self._create_input_(task, input_id, value) diff --git a/src/engine/src/core/workflows/WorkflowExecutor.py b/src/engine/src/core/workflows/WorkflowExecutor.py index f64c5f15..a3390729 100644 --- a/src/engine/src/core/workflows/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/WorkflowExecutor.py @@ -332,13 +332,13 @@ def _start_task(self, task): try: task_input_file_staging_service.stage(task) except TaskInputStagingError as e: - self.state.ctx.logger.info(self.t_str(task, "FAILED")) - self.publish(Event(TASK_FAILED, self.state.ctx, task=task)) # Get the next queued tasks if any unstarted_threads = self._on_task_terminal_state( task, TaskResult(1, errors=[str(e)]) ) + self.state.ctx.logger.info(self.t_str(task, "FAILED")) + self.publish(Event(TASK_FAILED, self.state.ctx, task=task)) # NOTE Triggers hook _on_change_ready_task self.state.ready_tasks += unstarted_threads