diff --git a/src/engine/src/core/workflows/WorkflowExecutor.py b/src/engine/src/core/workflows/WorkflowExecutor.py index 09e4a9eb..fa38bb58 100644 --- a/src/engine/src/core/workflows/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/WorkflowExecutor.py @@ -26,7 +26,8 @@ MissingInitialTasksError, InvalidDependenciesError, CycleDetectedError, - ConditionalExpressionEvalError + ConditionalExpressionEvalError, + TaskInputStagingError ) from core.middleware.archivers import S3Archiver, IRODSArchiver from conf.constants import BASE_WORK_DIR @@ -328,7 +329,17 @@ def _start_task(self, task): task_input_file_staging_service = self.container.load( "TaskInputFileStagingService" ) - task_input_file_staging_service.stage(task) + try: + task_input_file_staging_service.stage(task) + except TaskInputStagingError as e: + self.state.ctx.logger.info(self.t_str(task, "FAILED")) + self.publish(Event(TASK_FAILED, self.state.ctx, task=task)) + # Get the next queued tasks if any + unstarted_threads = self._on_task_terminal_state(task, task_result) + + # NOTE Triggers hook _on_change_ready_task + self.state.ready_tasks += unstarted_threads + return # Log the task status self.state.ctx.logger.info(self.t_str(task, "ACTIVE"))