Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Nov 16, 2023
1 parent 3264e56 commit 4600c37
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def _staging(self, ctx):
except InvalidDependenciesError as e:
self._on_pipeline_terminal_state(PIPELINE_FAILED, message=str(e))
except Exception as e:
print("ERROR HERE", e)
self._on_pipeline_terminal_state(PIPELINE_FAILED, message=str(e))

@interceptable()
Expand Down Expand Up @@ -406,6 +407,7 @@ def _get_initial_tasks(self, tasks):

@interceptable()
def _set_tasks(self, tasks):
print("START SET TASKS")
# Create a list of the ids of the tasks
task_ids = [task.id for task in tasks]

Expand Down Expand Up @@ -437,18 +439,21 @@ def _set_tasks(self, tasks):
# The first is a mapping between each task and the tasks that depend on them,
# and the second is a mapping between a task and tasks it depends on.
# Suboptimal? Yes, Space complexity is ~O(n^2), but makes for easy lookups
print("START CREATE DEPENDENCY MAP")
graph_scaffold = {task.id: [] for task in self.state.tasks}
self.state.dependency_graph = graph_scaffold
self.state.reverse_dependency_graph = graph_scaffold
for child_task in self.state.tasks:
for parent_task in child_task.depends_on:
self.state.dependency_graph[parent_task.id].append(child_task.id)
self.state.reverse_dependency_graph[child_task.id].append(parent_task.id)
print("END CREATE DEPENDENCY MAP")

# Determine if a task can fail and set the tasks' can_fail flags.
# A parent task is permitted to fail iff all of the following criteria are met:
# - It has children
# - All can_fail flags for a given parent task's children's task_dependency object == True
print("BEGIN SET CAN FAIL")
for task in self.state.tasks:
can_fail_flags = []
for child_task_id in self.state.reverse_dependency_graph[task.id]:
Expand All @@ -460,7 +465,9 @@ def _set_tasks(self, tasks):
# If the length of can_fail_flags == 0, then this task has no child tasks
task.can_fail = False if len(can_fail_flags) == 0 else all(can_fail_flags)

print("END SET CAN_FAIL")
# Detect loops in the graph
print("START CYCLE DETECTION")
try:
initial_tasks = self._get_initial_tasks(self.state.tasks)
graph_validator = GraphValidator()
Expand All @@ -470,6 +477,7 @@ def _set_tasks(self, tasks):
InvalidDependenciesError, CycleDetectedError, MissingInitialTasksError
) as e:
raise e
print("END CYCLE DETECTION")

# Add all tasks to the queue
self.state.queue = [ task for task in self.state.tasks ]
Expand Down

0 comments on commit 4600c37

Please sign in to comment.