Skip to content

Commit

Permalink
Fix fail on task staging error. Remove details about file location fr…
Browse files Browse the repository at this point in the history
…om error
  • Loading branch information
nathandf committed Feb 19, 2024
1 parent 6d339c3 commit c679dcf
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
6 changes: 3 additions & 3 deletions src/engine/src/core/tasks/TaskInputFileStagingService.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,23 @@ def stage(self, task: Task):
)
except Exception as e:
if input_.required:
raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}' | {e}")
raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}'")
if key == "args":
try:
value = self._value_from_service.get_arg_value_by_key(
value_from[key]
)
except Exception as e:
if input_.required:
raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}' | {e}")
raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}'")
if key == "env":
try:
value = self._value_from_service.get_env_value_by_key(
value_from[key]
)
except Exception as e:
if input_.required:
raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}' | {e}")
raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}'")

self._create_input_(task, input_id, value)

Expand Down
4 changes: 2 additions & 2 deletions src/engine/src/core/workflows/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,13 @@ def _start_task(self, task):
try:
task_input_file_staging_service.stage(task)
except TaskInputStagingError as e:
self.state.ctx.logger.info(self.t_str(task, "FAILED"))
self.publish(Event(TASK_FAILED, self.state.ctx, task=task))
# Get the next queued tasks if any
unstarted_threads = self._on_task_terminal_state(
task,
TaskResult(1, errors=[str(e)])
)
self.state.ctx.logger.info(self.t_str(task, "FAILED"))
self.publish(Event(TASK_FAILED, self.state.ctx, task=task))

# NOTE Triggers hook _on_change_ready_task
self.state.ready_tasks += unstarted_threads
Expand Down

0 comments on commit c679dcf

Please sign in to comment.