Skip to content

Commit

Permalink
Namespace socket and tab completion (#388)
Browse files Browse the repository at this point in the history
AiiDA has a namespace port and supports nested ports, e.g., the "base.pw.metadata" in the `PwRelaxWorkChain.` Previously, WorkGraph flat the nested port, and make every port on the top-level. There are many disadvantages:
- one needs to use the dict-style code to access the socket, e.g., `outputs["relax.output_structure"]`.
- tab-completion is not possible, because "." is in the key.

This PR uses the `NodeSocketNamespace` from the latest `node-graph` to support the nested sockets, i.e., the namespace in AiiDA.  The top-level `inputs` and `outputs` are also namespace sockets now. The auto-completion is also supported.
  • Loading branch information
superstar54 committed Dec 11, 2024
1 parent df7374b commit 6bf5c50
Show file tree
Hide file tree
Showing 62 changed files with 767 additions and 1,031 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ def multiply(x, y):
wg = WorkGraph("test_add_multiply")
wg.add_task(add, name="add1")
wg.add_task(multiply, name="multiply1")
wg.add_link(wg.tasks["add1"].outputs["result"], wg.tasks["multiply1"].inputs["x"])
wg.add_link(wg.tasks.add1.outputs.result, wg.tasks.multiply1.inputs.x)

```

Prepare inputs and submit the workflow:
Prepare inputs and run the workflow:

```python
from aiida import load_profile

load_profile()

wg.submit(inputs = {"add1": {"x": 2, "y": 3}, "multiply1": {"y": 4}}, wait=True)
print("Result of multiply1 is", wg.tasks["multiply1"].outputs[0].value)
wg.run(inputs = {"add1": {"x": 2, "y": 3}, "multiply1": {"y": 4}})
print("Result of multiply1 is", wg.tasks.multiply1.outputs.result.value)
```
## Web ui
To use the web ui, first install the web ui package:
Expand Down
184 changes: 8 additions & 176 deletions docs/gallery/autogen/quick_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def multiply(x, y):
wg = WorkGraph("add_multiply_workflow")
add_task = wg.add_task(add, name="add1")
# link the output of the `add` task to one of the `x` input of the `multiply` task.
wg.add_task(multiply, name="multiply1", x=add_task.outputs["result"])
wg.add_task(multiply, name="multiply1", x=add_task.outputs.result)

# export the workgraph to html file so that it can be visualized in a browser
wg.to_html()
Expand All @@ -128,8 +128,8 @@ def multiply(x, y):
)

print("State of WorkGraph: {}".format(wg.state))
print("Result of add : {}".format(wg.tasks["add1"].outputs[0].value))
print("Result of multiply : {}".format(wg.tasks["multiply1"].outputs[0].value))
print("Result of add : {}".format(wg.tasks.add1.outputs.result.value))
print("Result of multiply : {}".format(wg.tasks.multiply1.outputs.result.value))


######################################################################
Expand All @@ -141,172 +141,6 @@ def multiply(x, y):
generate_node_graph(wg.pk)


######################################################################
# Remote job
# ----------
#
# The ``PythonJob`` is a built-in task that allows users to run Python
# functions on a remote computer.
#
# In this case, we use define the task using normal function instead of
# ``calcfunction``. Thus, user does not need to install AiiDA on the
# remote computer.
#

from aiida_workgraph import WorkGraph, task

# define add task
@task()
def add(x, y):
return x + y


# define multiply task
@task()
def multiply(x, y):
return x * y


wg = WorkGraph("second_workflow")
# You might need to adapt the label to python3 if you use this as your default python
wg.add_task("PythonJob", function=add, name="add", command_info={"label": "python"})
wg.add_task(
"PythonJob",
function=multiply,
name="multiply",
x=wg.tasks["add"].outputs[0],
command_info={"label": "python"},
)

# export the workgraph to html file so that it can be visualized in a browser
wg.to_html()
# visualize the workgraph in jupyter-notebook
# wg


######################################################################
# Submit the workgraph
# ~~~~~~~~~~~~~~~~~~~~
#
# **Code**: We can set the ``computer`` to the remote computer where we
# want to run the job. This will create a code ``python@computer`` if not
# exists. Of course, you can also set the ``code`` directly if you have
# already created the code.
#
# **Data**: Users can (and is recoomaneded) use normal Python data as
# input. The workgraph will transfer the data to AiiDA data
# (``PickledData``) using pickle.
#
# **Python Version**: since pickle is used to store and load data, the
# Python version on the remote computer should match the one used in the
# localhost. One can use conda to create a virtual environment with the
# same Python version. Then activate the environment before running the
# script.
#
# .. code:: python
#
# # For real applications, one can pass metadata to the scheduler to activate the conda environment
# metadata = {
# "options": {
# 'custom_scheduler_commands' : 'module load anaconda\nconda activate py3.11\n',
# }
# }
#

from aiida_workgraph.utils import generate_node_graph

# ------------------------- Submit the calculation -------------------
# For real applications, one can pass metadata to the scheduler to activate the conda environment
metadata = {
"options": {
# 'custom_scheduler_commands' : 'module load anaconda\nconda activate py3.11\n',
"custom_scheduler_commands": "",
}
}

wg.submit(
inputs={
"add": {"x": 2, "y": 3, "computer": "localhost", "metadata": metadata},
"multiply": {"y": 4, "computer": "localhost", "metadata": metadata},
},
wait=True,
)
# ------------------------- Print the output -------------------------
print(
"\nResult of multiply is {} \n\n".format(
wg.tasks["multiply"].outputs["result"].value
)
)
# ------------------------- Generate node graph -------------------
generate_node_graph(wg.pk)


######################################################################
# Use parent folder
# ~~~~~~~~~~~~~~~~~
#
# The parent_folder parameter allows a task to access the output files of
# a parent task. This feature is particularly useful when you want to
# reuse data generated by a previous computation in subsequent
# computations. In the following example, the multiply task uses the
# ``result.txt`` file created by the add task.
#
# By default, the content of the parent folder is symlinked to the working
# directory. In the function, you can access the parent folder using the
# relative path. For example, ``./parent_folder/result.txt``.
#

from aiida_workgraph import WorkGraph, task

# define add task
@task()
def add(x, y):
z = x + y
with open("result.txt", "w") as f:
f.write(str(z))


# define multiply task
@task()
def multiply(x, y):
with open("parent_folder/result.txt", "r") as f:
z = int(f.read())
return x * y + z


wg = WorkGraph("third_workflow")
# You might need to adapt the label to python3 if you use this as your default python
wg.add_task("PythonJob", function=add, name="add", command_info={"label": "python"})
wg.add_task(
"PythonJob",
function=multiply,
name="multiply",
parent_folder=wg.tasks["add"].outputs["remote_folder"],
command_info={"label": "python"},
)

wg.to_html()


######################################################################
# Submit the calculation
#

# ------------------------- Submit the calculation -------------------
wg.submit(
inputs={
"add": {"x": 2, "y": 3, "computer": "localhost"},
"multiply": {"x": 3, "y": 4, "computer": "localhost"},
},
wait=True,
)
print(
"\nResult of multiply is {} \n\n".format(
wg.tasks["multiply"].outputs["result"].value
)
)


######################################################################
# CalcJob and WorkChain
# ---------------------
Expand Down Expand Up @@ -341,7 +175,7 @@ def multiply(x, y):
wg = WorkGraph("test_add_multiply")
add1 = wg.add_task(ArithmeticAddCalculation, name="add1", x=Int(2), y=Int(3), code=code)
add2 = wg.add_task(ArithmeticAddCalculation, name="add2", y=Int(3), code=code)
wg.add_link(wg.tasks["add1"].outputs["sum"], wg.tasks["add2"].inputs["x"])
wg.add_link(wg.tasks.add1.outputs.sum, wg.tasks.add2.inputs.x)
wg.to_html()


Expand All @@ -350,7 +184,7 @@ def multiply(x, y):
#

wg.submit(wait=True)
print("Result of task add1: {}".format(wg.tasks["add2"].outputs["sum"].value))
print("Result of task add1: {}".format(wg.tasks.add2.outputs.sum.value))

from aiida_workgraph.utils import generate_node_graph

Expand Down Expand Up @@ -413,7 +247,7 @@ def add_multiply(x, y, z):
wg = WorkGraph()
wg.add_task(add, name="add", x=x, y=y)
wg.add_task(multiply, name="multiply", x=z)
wg.add_link(wg.tasks["add"].outputs["result"], wg.tasks["multiply"].inputs["y"])
wg.add_link(wg.tasks.add.outputs.result, wg.tasks.multiply.inputs.y)
# don't forget to return the `wg`
return wg

Expand All @@ -431,7 +265,7 @@ def add_multiply(x, y, z):
add_multiply1 = wg.add_task(add_multiply, x=Int(2), y=Int(3), z=Int(4))
add_multiply2 = wg.add_task(add_multiply, x=Int(2), y=Int(3))
# link the output of a task to the input of another task
wg.add_link(add_multiply1.outputs["multiply"], add_multiply2.inputs["z"])
wg.add_link(add_multiply1.outputs.multiply, add_multiply2.inputs.z)
wg.submit(wait=True)
print("WorkGraph state: ", wg.state)

Expand All @@ -440,9 +274,7 @@ def add_multiply(x, y, z):
# Get the result of the tasks:
#

print(
"Result of task add_multiply1: {}".format(add_multiply1.outputs["multiply"].value)
)
print("Result of task add_multiply1: {}".format(add_multiply1.outputs.multiply.value))

generate_node_graph(wg.pk)

Expand Down
8 changes: 4 additions & 4 deletions docs/gallery/built-in/autogen/shelljob.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
wg.submit(wait=True)

# Print out the result:
print("\nResult: ", date_task.outputs["stdout"].value.get_content())
print("\nResult: ", date_task.outputs.stdout.value.get_content())

# %%
# Under the hood, an AiiDA ``Code`` instance ``date`` will be created on the ``localhost`` computer. In addition, it is also
Expand Down Expand Up @@ -93,7 +93,7 @@
wg.submit(wait=True)

# Print out the result:
print("\nResult: ", date_task.outputs["stdout"].value.get_content())
print("\nResult: ", date_task.outputs.stdout.value.get_content())

# %%
# Running a shell command with files as arguments
Expand All @@ -117,7 +117,7 @@
wg.submit(wait=True)

# Print out the result:
print("\nResult: ", cat_task.outputs["stdout"].value.get_content())
print("\nResult: ", cat_task.outputs.stdout.value.get_content())

# %%
# Create a workflow
Expand Down Expand Up @@ -157,7 +157,7 @@ def parser(dirpath):
name="expr_2",
command="expr",
arguments=["{result}", "*", "{z}"],
nodes={"z": Int(4), "result": expr_1.outputs["result"]},
nodes={"z": Int(4), "result": expr_1.outputs.result},
parser=PickledData(parser),
parser_outputs=[{"name": "result"}],
)
Expand Down
13 changes: 6 additions & 7 deletions docs/gallery/concept/autogen/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def multiply(x, y):
return x * y


print("Input ports: ", multiply.task().inputs.keys())
print("Output ports: ", multiply.task().outputs.keys())
print("Input ports: ", multiply.task().get_input_names())
print("Output ports: ", multiply.task().get_output_names())

multiply.task().to_html()

Expand All @@ -49,8 +49,8 @@ def add_minus(x, y):
return {"sum": x + y, "difference": x - y}


print("Input ports: ", add_minus.task().inputs.keys())
print("Ouput ports: ", add_minus.task().outputs.keys())
print("Input ports: ", add_minus.task().get_input_names())
print("Ouput ports: ", add_minus.task().get_output_names())
add_minus.task().to_html()


Expand Down Expand Up @@ -85,8 +85,7 @@ def add(x: int, y: float) -> float:
return x + y


for input in add.task().inputs:
print("{:30s}: {:20s}".format(input.name, input.identifier))
print("inputs: ", add.task().inputs)


######################################################################
Expand Down Expand Up @@ -140,7 +139,7 @@ def add(x, y):

def create_sockets(self):
# create a General port.
inp = self.inputs.new("workgraph.Any", "symbols")
inp = self.add_input("workgraph.Any", "symbols")
# add a string property to the port with default value "H".
inp.add_property("String", "default", default="H")

Expand Down
Loading

0 comments on commit 6bf5c50

Please sign in to comment.