Skip to content

Commit

Permalink
Add execption logging to workflow executor
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 10, 2024
1 parent 5676957 commit 8cc6465
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/engine/src/workflows/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def start(self, ctx, threads):
# NOTE Triggers the hook _on_change_ready_task
self.state.ready_tasks += unstarted_threads
except Exception as e:
server_logger.exception(e.__cause__)
# Trigger the terminal state callback.
self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e))

Expand Down Expand Up @@ -216,8 +217,10 @@ def _staging(self, ctx):
try:
self._set_tasks(self.state.ctx.pipeline.tasks)
except InvalidDependenciesError as e:
server_logger.exception(e.__cause__)
self._handle_pipeline_terminal_state(PIPELINE_FAILED, message=str(e))
except Exception as e:
server_logger.exception(e.__cause__)
self._handle_pipeline_terminal_state(PIPELINE_FAILED, message=str(e))

@interruptable()
Expand Down Expand Up @@ -268,6 +271,7 @@ def _prepare_tasks(self):
try:
task = template_mapper.map(task, task.uses)
except Exception as e:
server_logger.exception(e.__cause__)
# Trigger the terminal state callback.
self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e))

Expand Down Expand Up @@ -320,6 +324,7 @@ def _start_task(self, task):
evaluator = self.container.load("ConditionalExpressionEvaluator")
skip = not evaluator.evaluate_all(task.conditions)
except ConditionalExpressionEvalError as e:
server_logger.exception(e.__cause__)
expression_error = True
task_result = TaskResult(1, errors=[str(e)])

Expand All @@ -337,6 +342,7 @@ def _start_task(self, task):
TaskResult(1, errors=[str(e)])
)
self.state.ctx.logger.info(self.t_log(task, "FAILED"))
server_logger.exception(e.__cause__)
self.publish(Event(TASK_FAILED, self.state.ctx, task=task))

# NOTE Triggers hook _on_change_ready_task
Expand Down Expand Up @@ -366,9 +372,11 @@ def _start_task(self, task):
}
except InvalidTaskTypeError as e:
self.state.ctx.logger.error(str(e))
server_logger.exception(e.__cause__)
task_result = TaskResult(1, errors=[str(e)])
except Exception as e:
self.state.ctx.logger.error(str(e))
server_logger.exception(e.__cause__)
task_result = TaskResult(1, errors=[str(e)])

# Get the next queued tasks if any
Expand Down Expand Up @@ -548,6 +556,7 @@ def _set_tasks(self, tasks):
else all(parent_can_fail_flags)
)
except Exception as e:
server_logger.exception(e.__cause__)
raise Exception(f"Error resolving can_fail flag for parent task '{parent_task_id}': {e}")

# Detect loops in the graph
Expand All @@ -559,6 +568,7 @@ def _set_tasks(self, tasks):
except (
InvalidDependenciesError, CycleDetectedError, MissingInitialTasksError
) as e:
server_logger.exception(e.__cause__)
raise e

# Add all tasks to the queue
Expand Down Expand Up @@ -752,6 +762,7 @@ def _initialize_notification_handlers(self):
try:
handler = middleware.handler(self.state.ctx)
except Exception as e:
server_logger.exception(e.__cause__)
self.state.ctx.logger.error(f"Could not intialize notification middleware. Updates about the pipeline and its task will not be persisited. Error: {str(e)}")
return

Expand Down Expand Up @@ -802,6 +813,7 @@ def _initialize_archivers(self):
try:
handler = middleware.handler()
except Exception as e:
server_logger.exception(e.__cause__)
self.state.ctx.logger.error(f"Could not intialize archive middleware. Pipeline results will not be persisted. Error: {str(e)}")
return

Expand Down

0 comments on commit 8cc6465

Please sign in to comment.