Skip to content

Commit

Permalink
only add global ios for edge nodes of task graph
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudz committed Oct 29, 2024
1 parent c0f2ed8 commit 9dfa91b
Show file tree
Hide file tree
Showing 3 changed files with 9,293 additions and 674 deletions.
89 changes: 60 additions & 29 deletions src/nomad_utility_workflows/utils/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import yaml
from pydantic import BaseModel, Field

# logger = logging.getLogger(__name__) # ! this is not functional I think
logger = get_logger(__name__)
TASK_M_DEF = 'nomad.datamodel.metainfo.workflow.TaskReference'
WORKFLOW_M_DEF = 'nomad.datamodel.metainfo.workflow.TaskReference'
Expand Down Expand Up @@ -167,9 +166,6 @@ class NomadTask(BaseModel):
outputs: list[NomadSection] = Field(default_factory=list)
task_section: Optional[NomadSection] = None

# class Config:
# arbitrary_types_allowed = True

def __init__(self, **data):
super().__init__(**data)
for i, input_ in enumerate(self.inputs):
Expand Down Expand Up @@ -283,11 +279,11 @@ def register_section(
self, node_key: Union[int, str, tuple], node_attrs: dict[str, Any]
) -> None:
section = NomadSection(**node_attrs)
self.task_elements[node_key] = section # ! build the tasks section by section
self.task_elements[node_key] = section

def fill_workflow_graph(self) -> None:
"""_summary_"""
for node_source, node_dest, edge in self.workflow_graph.edges(data=True):
for node_source, node_dest, edge in list(self.workflow_graph.edges(data=True)):
self._resolve_edge_inputs(node_source, node_dest, edge)
self._resolve_edge_outputs(node_source, node_dest, edge)
self._add_defaults(node_source, node_dest, edge)
Expand Down Expand Up @@ -333,9 +329,23 @@ def _add_defaults(self, node_source, node_dest, edge) -> None:
]:
for outputs_ in self._get_defaults('outputs', node_source, node_dest):
edge['inputs'].append(outputs_)
# add the output to the graph
self.workflow_graph.add_node(
len(self.workflow_graph.nodes), type='output', **outputs_
)
self.workflow_graph.add_edge(
node_source, len(self.workflow_graph.nodes) - 1
)
if self.workflow_graph.nodes[node_dest].get('type', '') in ['task', 'workflow']:
for inputs_ in self._get_defaults('inputs', node_source, node_dest):
edge['outputs'].append(inputs_)
# add the input to the graph
self.workflow_graph.add_node(
len(self.workflow_graph.nodes), type='input', **inputs_
)
self.workflow_graph.add_edge(
len(self.workflow_graph.nodes) - 1, node_dest
)

def _get_mainfile_path(self, node):
return (
Expand Down Expand Up @@ -421,35 +431,55 @@ def generate_archive(self) -> NomadWorkflowArchive:
archive.inputs = []
archive.outputs = []

for node_key, node in self.workflow_graph.nodes(data=True):
if node.get('type', '') == 'input':
element = self.task_elements[node_key]
#! Here I want to only add ios that are on the edge nodes of the subgraph!
# get the input task nodes
task_nodes = [
n
for n, attr in self.workflow_graph.nodes(data=True)
if attr.get('type', '') in ['task', 'workflow']
]
# Create a subgraph with only task nodes
task_graph = self.workflow_graph.subgraph(task_nodes)

# select input nodes from task graph that have no incoming edges
for node in [n for n, d in task_graph.in_degree if d == 0]:
# get the inputs from the incoming edges of these nodes within the full graph (should be inputs!)
for edge in self.workflow_graph.in_edges(node, data=True):
if self.workflow_graph.nodes[edge[0]].get('type', '') != 'input':
continue
element = self.task_elements[edge[0]]
archive.inputs.append(element)
elif node.get('type', '') == 'output':
element = self.task_elements[node_key]
# select output nodes from task graph that have no outgoing edges
for node in [n for n, d in task_graph.out_degree if d == 0]:
# get the outputs from the outgoing edges of these nodes within the full graph (should be outputs!)
for edge in self.workflow_graph.out_edges(node, data=True):
if self.workflow_graph.nodes[edge[1]].get('type', '') != 'output':
continue
element = self.task_elements[edge[1]]
archive.outputs.append(element)
elif node.get('type', '') in ['task', 'workflow']:
inputs = []
outputs = []
for _, _, edge in self.workflow_graph.out_edges(node_key, data=True):
if edge.get('inputs'):
outputs.extend(edge.get('inputs'))
for _, _, edge in self.workflow_graph.in_edges(node_key, data=True):
if edge.get('outputs'):
inputs.extend(edge.get('outputs'))

archive.tasks.append(
NomadTask(
name=node.get('name', ''),
inputs=inputs,
outputs=outputs,
task_section=self.task_elements[node_key],
)
# add the tasks
for node_key, node in task_graph.nodes(data=True):
inputs = []
outputs = []
for _, _, edge in self.workflow_graph.out_edges(node_key, data=True):
if edge.get('inputs'):
outputs.extend(edge.get('inputs'))
for _, _, edge in self.workflow_graph.in_edges(node_key, data=True):
if edge.get('outputs'):
inputs.extend(edge.get('outputs'))

archive.tasks.append(
NomadTask(
name=node.get('name', ''),
inputs=inputs,
outputs=outputs,
task_section=self.task_elements[node_key],
)
)

return archive


# TODO Extend the graph to add nodes for the additional default inouts etc
def nodes_to_graph(node_attributes: dict[int, Any]) -> nx.DiGraph:
"""_summary_
Expand Down Expand Up @@ -556,6 +586,7 @@ def build_nomad_workflow(
return workflow.workflow_graph


# TODO need to adjust the workflow graph along with the yaml and then spit out the final version that matches...
# TODO test this code on a number of already existing examples
# TODO create docs with some examples for dict and graph input types
# TODO add to readme/docs that this is not currently using NOMAD, but could be linked
Expand Down
Loading

0 comments on commit 9dfa91b

Please sign in to comment.