diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index baf2a6e7..e2d3604d 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -213,6 +213,7 @@ def _staging(self, ctx): except InvalidDependenciesError as e: self._on_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) except Exception as e: + print("ERROR HERE", e) self._on_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) @interceptable() @@ -406,6 +407,7 @@ def _get_initial_tasks(self, tasks): @interceptable() def _set_tasks(self, tasks): + print("START SET TASKS") # Create a list of the ids of the tasks task_ids = [task.id for task in tasks] @@ -437,6 +439,7 @@ def _set_tasks(self, tasks): # The first is a mapping between each task and the tasks that depend on them, # and the second is a mapping between a task and tasks it depends on. # Suboptimal? Yes, Space complexity is ~O(n^2), but makes for easy lookups + print("START CREATE DEPENDENCY MAP") graph_scaffold = {task.id: [] for task in self.state.tasks} self.state.dependency_graph = graph_scaffold self.state.reverse_dependency_graph = graph_scaffold @@ -444,11 +447,13 @@ def _set_tasks(self, tasks): for parent_task in child_task.depends_on: self.state.dependency_graph[parent_task.id].append(child_task.id) self.state.reverse_dependency_graph[child_task.id].append(parent_task.id) + print("END CREATE DEPENDENCY MAP") # Determine if a task can fail and set the tasks' can_fail flags. # A parent task is permitted to fail iff all of the following criteria are met: # - It has children # - All can_fail flags for a given parent task's children's task_dependency object == True + print("BEGIN SET CAN FAIL") for task in self.state.tasks: can_fail_flags = [] for child_task_id in self.state.reverse_dependency_graph[task.id]: @@ -460,7 +465,9 @@ def _set_tasks(self, tasks): # If the length of can_fail_flags == 0, then this task has no child tasks task.can_fail = False if len(can_fail_flags) == 0 else all(can_fail_flags) + print("END SET CAN_FAIL") # Detect loops in the graph + print("START CYCLE DETECTION") try: initial_tasks = self._get_initial_tasks(self.state.tasks) graph_validator = GraphValidator() @@ -470,6 +477,7 @@ def _set_tasks(self, tasks): InvalidDependenciesError, CycleDetectedError, MissingInitialTasksError ) as e: raise e + print("END CYCLE DETECTION") # Add all tasks to the queue self.state.queue = [ task for task in self.state.tasks ]