From 8cc64652e10ee69d724f767507dfedeec6364d25 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 10 Oct 2024 13:13:18 -0500 Subject: [PATCH] Add execption logging to workflow executor --- src/engine/src/workflows/WorkflowExecutor.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/engine/src/workflows/WorkflowExecutor.py b/src/engine/src/workflows/WorkflowExecutor.py index 892b556c..714a1277 100644 --- a/src/engine/src/workflows/WorkflowExecutor.py +++ b/src/engine/src/workflows/WorkflowExecutor.py @@ -176,6 +176,7 @@ def start(self, ctx, threads): # NOTE Triggers the hook _on_change_ready_task self.state.ready_tasks += unstarted_threads except Exception as e: + server_logger.exception(e.__cause__) # Trigger the terminal state callback. self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) @@ -216,8 +217,10 @@ def _staging(self, ctx): try: self._set_tasks(self.state.ctx.pipeline.tasks) except InvalidDependenciesError as e: + server_logger.exception(e.__cause__) self._handle_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) except Exception as e: + server_logger.exception(e.__cause__) self._handle_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) @interruptable() @@ -268,6 +271,7 @@ def _prepare_tasks(self): try: task = template_mapper.map(task, task.uses) except Exception as e: + server_logger.exception(e.__cause__) # Trigger the terminal state callback. self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) @@ -320,6 +324,7 @@ def _start_task(self, task): evaluator = self.container.load("ConditionalExpressionEvaluator") skip = not evaluator.evaluate_all(task.conditions) except ConditionalExpressionEvalError as e: + server_logger.exception(e.__cause__) expression_error = True task_result = TaskResult(1, errors=[str(e)]) @@ -337,6 +342,7 @@ def _start_task(self, task): TaskResult(1, errors=[str(e)]) ) self.state.ctx.logger.info(self.t_log(task, "FAILED")) + server_logger.exception(e.__cause__) self.publish(Event(TASK_FAILED, self.state.ctx, task=task)) # NOTE Triggers hook _on_change_ready_task @@ -366,9 +372,11 @@ def _start_task(self, task): } except InvalidTaskTypeError as e: self.state.ctx.logger.error(str(e)) + server_logger.exception(e.__cause__) task_result = TaskResult(1, errors=[str(e)]) except Exception as e: self.state.ctx.logger.error(str(e)) + server_logger.exception(e.__cause__) task_result = TaskResult(1, errors=[str(e)]) # Get the next queued tasks if any @@ -548,6 +556,7 @@ def _set_tasks(self, tasks): else all(parent_can_fail_flags) ) except Exception as e: + server_logger.exception(e.__cause__) raise Exception(f"Error resolving can_fail flag for parent task '{parent_task_id}': {e}") # Detect loops in the graph @@ -559,6 +568,7 @@ def _set_tasks(self, tasks): except ( InvalidDependenciesError, CycleDetectedError, MissingInitialTasksError ) as e: + server_logger.exception(e.__cause__) raise e # Add all tasks to the queue @@ -752,6 +762,7 @@ def _initialize_notification_handlers(self): try: handler = middleware.handler(self.state.ctx) except Exception as e: + server_logger.exception(e.__cause__) self.state.ctx.logger.error(f"Could not intialize notification middleware. Updates about the pipeline and its task will not be persisited. Error: {str(e)}") return @@ -802,6 +813,7 @@ def _initialize_archivers(self): try: handler = middleware.handler() except Exception as e: + server_logger.exception(e.__cause__) self.state.ctx.logger.error(f"Could not intialize archive middleware. Pipeline results will not be persisted. Error: {str(e)}") return