Skip to content

Commit

Permalink
polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Mar 3, 2024
1 parent e41f5a5 commit 98ab96f
Showing 1 changed file with 44 additions and 41 deletions.
85 changes: 44 additions & 41 deletions examples/wdlviz.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,55 +88,63 @@ def wdlviz(
Project the workflow's built-in dependency graph onto a graphviz representation
"""
# References:
# 1. WDL object model -- https://miniwdl.readthedocs.io/en/latest/WDL.html#module-WDL.Tree
# 1. WDL AST object model -- https://miniwdl.readthedocs.io/en/latest/WDL.html#module-WDL.Tree
# 2. graphviz API -- https://graphviz.readthedocs.io/en/stable/manual.html

# initialiaze Digraph
fontname = "Roboto"
top = graphviz.Digraph()
top.attr(
label=workflow.name,
label=f"<<B>{workflow.name}</B>>",
labelloc="t",
fontname=fontname,
compound="true",
rankdir=rankdir,
concentrate="true",
splines=splines,
)
top.attr("node", fontname=fontname)
top.attr("edge", color="#00000080")

# recursively add graphviz nodes for each workflow node.
# Recursively process the workflow AST into the Digraph.
# We'll identify nodes by their python id() instead of the workflow_node_id strings exposed
# in the miniwdl AST. The latter are only unique within a given workflow, while for graphviz
# we need globally unique id's (even when we have nested subworkflows).
nodes_visited = set()
subworkflows_visited = set()

def add_node(graph: graphviz.Digraph, node: WDL.WorkflowNode):
nonlocal nodes_visited, subworkflows_visited
def add_node(graph, workflow, node):
nonlocal nodes_visited
if isinstance(node, WDL.WorkflowSection):
# scatter/conditional section: add a cluster subgraph to contain its body
with graph.subgraph(name=f"cluster-{id(node)}") as sg:
label = "scatter" if isinstance(node, WDL.Scatter) else "if"
sg.attr(label=f"{label}({node.expr})", fontname=fontname, rank="same")
for child in node.body:
add_node(sg, child)
add_node(sg, workflow, child)
# Add an invisible node inside the subgraph, which provides a sink for dependencies
# of the scatter/conditional expression itself
sg.node(str(id(node)), "", style="invis", height="0", width="0", margin="0")
nodes_visited.add(node.workflow_node_id)
nodes_visited |= set(g.workflow_node_id for g in node.gathers.values())
graph.edge(str(id(workflow)), str(id(node)), style="invis") # helps layout
nodes_visited.add(id(node))
nodes_visited |= set(id(g) for g in node.gathers.values())
elif isinstance(node, WDL.Call) or (
isinstance(node, WDL.Decl)
and (inputs or nodes_visited.intersection(node.workflow_node_dependencies))
and (
inputs
or nodes_visited.intersection(
id(workflow.get_node(it)) for it in node.workflow_node_dependencies
)
)
):
name = node.name
if isinstance(node, WDL.Call) and isinstance(node.callee, WDL.Workflow):
# subworkflow call: add a cluster subgraph for the called workflow; only once, if
# the subworkflow is called in multiple places.
if id(node.callee) not in subworkflows_visited:
subworkflows_visited.add(id(node.callee))
if id(node.callee) not in nodes_visited:
nodes_visited.add(id(node.callee))
with top.subgraph(name=f"cluster-{id(node.callee)}") as sg:
sg.attr(label=node.callee.name, fontname=fontname, rank="max")
sg.attr(label=f"<<B>{node.callee.name}</B>>", fontname=fontname, rank="max")
add_workflow(sg, node.callee)
graph.edge(str(id(workflow)), str(id(node.callee)), style="invis") # helps layout
# dotted edge from call to subworkflow
graph.edge(
f"{id(node)}:s",
Expand All @@ -146,33 +154,25 @@ def add_node(graph: graphviz.Digraph, node: WDL.WorkflowNode):
arrowhead="none",
constraint="false",
)
# invisible edge for subworkflow hierarchy
top.edge(
f"{id(workflow)}",
f"{id(node.callee)}",
style="invis",
height="0",
width="0",
margin="0",
)
name = f"{node.callee.name} as {name}"
name = f"<<B>{node.callee.name}</B> <I>as</I> {name}>"
# node for call or decl
graph.node(
str(id(node)),
name,
shape=("cds" if isinstance(node, WDL.Call) else "plaintext"),
)
nodes_visited.add(node.workflow_node_id)
graph.edge(str(id(workflow)), str(id(node)), style="invis") # helps layout
nodes_visited.add(id(node))

# add edge for each dependency between workflow nodes
# add edge for each dependency between (visited) workflow nodes
def add_edges(graph, workflow, node):
for dep_id in node.workflow_node_dependencies:
dep = workflow.get_node(dep_id)
# leave Gather nodes invisible by replacing any dependencies on them with their
# final_referee
if isinstance(dep, WDL.Tree.Gather):
dep = dep.final_referee
if dep.workflow_node_id in nodes_visited and node.workflow_node_id in nodes_visited:
if id(dep) in nodes_visited and id(node) in nodes_visited:
lhead = None
if isinstance(node, WDL.WorkflowSection):
lhead = f"cluster-{id(node)}"
Expand All @@ -182,36 +182,39 @@ def add_edges(graph, workflow, node):
add_edges(graph, workflow, child)

def add_workflow(graph, workflow):
# invisible source/sink node for the workflow subgraph
graph.node(
str(id(workflow)),
"",
style="invis",
height="0",
width="0",
margin="0",
)

# workflow body nodes
for node in workflow.body:
add_node(graph, node)
add_node(graph, workflow, node)

# cluster of the input decls
if inputs:
with graph.subgraph(name=f"cluster-inputs-{id(workflow)}") as sg:
for inp in workflow.inputs or []:
assert inp.workflow_node_id.startswith("decl-")
sg.node(str(id(inp)), inp.workflow_node_id[5:], shape="plaintext")
nodes_visited.add(inp.workflow_node_id)
sg.attr(label="inputs", fontname=fontname)
nodes_visited.add(id(inp))
sg.attr(label="<<I>inputs</I>>", fontname=fontname)

# cluster of the output decls
if outputs:
with graph.subgraph(name=f"cluster-outputs-{id(workflow)}") as sg:
for outp in workflow.outputs or []:
assert outp.workflow_node_id.startswith("output-")
sg.node(str(id(outp)), outp.workflow_node_id[7:], shape="plaintext")
nodes_visited.add(outp.workflow_node_id)
sg.attr(label="outputs", fontname=fontname)

graph.node( # sink
str(id(workflow)),
"",
style="invis",
height="0",
width="0",
margin="0",
)
nodes_visited.add(id(outp))
sg.attr(label="<<I>outputs</I>>", fontname=fontname)

# edges
for node in (workflow.inputs or []) + workflow.body + (workflow.outputs or []):
add_edges(graph, workflow, node)

Expand Down

0 comments on commit 98ab96f

Please sign in to comment.