-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework parallel doc example using CalcJob's #288
base: main
Are you sure you want to change the base?
Changes from 19 commits
6792713
1721226
ef5af10
7ebea26
a067e68
fb86a6c
9257e25
7d567de
0f20dbc
38e5951
46faf07
e7fe03a
019ee50
d350b39
77a35a5
174d9ea
d7089cc
f18e337
286966c
d28bb99
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,148 +1,280 @@ | ||||||||||
""" | ||||||||||
======================= | ||||||||||
===================== | ||||||||||
Run tasks in parallel | ||||||||||
======================= | ||||||||||
===================== | ||||||||||
""" | ||||||||||
# %% | ||||||||||
# Introduction | ||||||||||
# ============ | ||||||||||
# In this tutorial, you will learn how to run task in parallel. | ||||||||||
# | ||||||||||
# Load the AiiDA profile. | ||||||||||
# | ||||||||||
|
||||||||||
# In this tutorial, you will learn how to run tasks and WorkGraphs in parallel. | ||||||||||
# When defining the dependencies WorkGraph by linking tasks the WorkGraph | ||||||||||
# engine will automatically take care of parallelizing the independent tasks. One | ||||||||||
# caveat is that we cannot use calcfunctions for this purpose as they all run | ||||||||||
# in the same runner environment and therefore are blocking each other. For | ||||||||||
# that reason we need to use `CalcJob`s that can be run in different runner | ||||||||||
# environments and therefore can be run in parallel. | ||||||||||
|
||||||||||
# Load the AiiDA profile. | ||||||||||
from aiida import load_profile | ||||||||||
|
||||||||||
load_profile() | ||||||||||
|
||||||||||
|
||||||||||
# %% | ||||||||||
# First workflow | ||||||||||
# ============== | ||||||||||
# Suppose we want to calculate ```(x + y) * z ``` in two steps. First, add `x` and `y`, then multiply the result with `z`. And `X` is a list of values. We want to calculate these in parallel. | ||||||||||
# | ||||||||||
# Create task | ||||||||||
# ------------ | ||||||||||
# First, one should know that we can not launch a subprocess inside a `task` or a `calcfunction`. We need a create a `WorkGraph` to run tasksin parallel. And then treat this `WorkGraph` as a task. | ||||||||||
# | ||||||||||
# Parallel addition workflow | ||||||||||
# ========================== | ||||||||||
# Suppose we want to calculate ```x + y + u + v``` in a parallel, instead of | ||||||||||
# computing sequentially ```(((x + y) + u) + v)``` we compute it like | ||||||||||
# ```((x + y) + (u + v))``` to compute ```x + y``` and ```u + v``` in parallel. | ||||||||||
# aiida-core already provides a ArithmeticAddCalculation CalcJob for performing | ||||||||||
# addition which we will use it for this example | ||||||||||
|
||||||||||
from aiida_workgraph import WorkGraph, task | ||||||||||
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation | ||||||||||
from aiida.orm import InstalledCode, load_computer, load_code, load_node | ||||||||||
from aiida.common.exceptions import NotExistent | ||||||||||
|
||||||||||
# The ArithmeticAddCalculation needs to know where bash is stored | ||||||||||
try: | ||||||||||
code = load_code("add@localhost") # The computer label can also be omitted here | ||||||||||
except NotExistent: | ||||||||||
code = InstalledCode( | ||||||||||
computer=load_computer("localhost"), | ||||||||||
filepath_executable="/bin/bash", | ||||||||||
label="add", | ||||||||||
default_calc_job_plugin="core.arithmetic.add", | ||||||||||
).store() | ||||||||||
|
||||||||||
wg = WorkGraph("parallel") | ||||||||||
x, y, u, v = (1, 2, 3, 4) | ||||||||||
add_xy = wg.add_task(ArithmeticAddCalculation, name="add_xy", x=x, y=y, code=code) | ||||||||||
add_xy.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds | ||||||||||
add_uv = wg.add_task(ArithmeticAddCalculation, name="add_uv", x=u, y=v, code=code) | ||||||||||
add_uv.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds | ||||||||||
add_xyuv = wg.add_task( | ||||||||||
ArithmeticAddCalculation, | ||||||||||
name="add_xyuv", | ||||||||||
x=add_xy.outputs["sum"], | ||||||||||
y=add_uv.outputs["sum"], | ||||||||||
code=code, | ||||||||||
) | ||||||||||
# %% | ||||||||||
# We can verify that the tasks add_xy and add_uv are independent from each other | ||||||||||
# and therefore will be run automatically in parallel. | ||||||||||
|
||||||||||
wg.to_html() | ||||||||||
|
||||||||||
from aiida_workgraph import task, WorkGraph | ||||||||||
|
||||||||||
# define multiply task | ||||||||||
@task.calcfunction() | ||||||||||
def multiply(x, y): | ||||||||||
return x * y | ||||||||||
# %% | ||||||||||
# Running workgraph | ||||||||||
|
||||||||||
wg.submit(wait=True) | ||||||||||
|
||||||||||
# Create a WorkGraph as a task | ||||||||||
@task.graph_builder() | ||||||||||
def multiply_parallel(X, y): | ||||||||||
wg = WorkGraph() | ||||||||||
# here the task `multiply` is created and will run in parallel | ||||||||||
for key, value in X.items(): | ||||||||||
wg.add_task(multiply, name=f"multiply_{key}", x=value, y=y) | ||||||||||
return wg | ||||||||||
# %% | ||||||||||
# We look at the ctime (the time of creation when submitted/run) and the mtime (the time the task has been last modified which is when its state changes to finish). | ||||||||||
print("add_xy created at:", add_xy.ctime.time(), "finished at:", add_xy.mtime.time()) | ||||||||||
print("add_uv created at:", add_uv.ctime.time(), "finished at:", add_uv.mtime.time()) | ||||||||||
|
||||||||||
# %% | ||||||||||
# We can see that both CalcJob's have been created almost at the same time | ||||||||||
|
||||||||||
# %% | ||||||||||
# Create the workflow | ||||||||||
# --------------------- | ||||||||||
# Comparison with a calcfunction | ||||||||||
# ------------------------------ | ||||||||||
# | ||||||||||
|
||||||||||
from aiida_workgraph import WorkGraph | ||||||||||
from aiida.orm import Int, List | ||||||||||
|
||||||||||
X = {"a": Int(1), "b": Int(2), "c": Int(3)} | ||||||||||
y = Int(2) | ||||||||||
z = Int(3) | ||||||||||
wg = WorkGraph("parallel_tasks") | ||||||||||
multiply_parallel1 = wg.add_task(multiply_parallel, name="multiply_parallel1", X=X, y=y) | ||||||||||
@task.calcfunction() | ||||||||||
def add(x, y, sleep): | ||||||||||
import time | ||||||||||
|
||||||||||
time.sleep(sleep.value) | ||||||||||
return x + y | ||||||||||
|
||||||||||
|
||||||||||
wg = WorkGraph("parallel") | ||||||||||
x, y, u, v = (1, 2, 3, 4) | ||||||||||
add_xy = wg.add_task(add, x=x, y=y, sleep=3) | ||||||||||
add_uv = wg.add_task(add, x=x, y=y, sleep=3) | ||||||||||
add_xyuv = wg.add_task( | ||||||||||
add, x=add_xy.outputs["result"], y=add_uv.outputs["result"], sleep=0 | ||||||||||
) | ||||||||||
|
||||||||||
wg.to_html() | ||||||||||
|
||||||||||
# %% | ||||||||||
|
||||||||||
wg.submit(wait=True) | ||||||||||
|
||||||||||
# %% | ||||||||||
# Printing timings | ||||||||||
|
||||||||||
print("add_xy created at", add_xy.ctime.time(), "finished at", add_xy.mtime.time()) | ||||||||||
print("add_uv created at", add_uv.ctime.time(), "finished at", add_uv.mtime.time()) | ||||||||||
|
||||||||||
# %% | ||||||||||
# Check the status and results | ||||||||||
# ----------------------------- | ||||||||||
# | ||||||||||
# We can see that the calcfunctions have been run with a 3 seconds delay | ||||||||||
|
||||||||||
|
||||||||||
# %% | ||||||||||
# Parallelizing WorkGraphs | ||||||||||
# ======================== | ||||||||||
# We will parallelize a workgraph by two ways, one time we submit all workgraphs, | ||||||||||
# the other time we use the graph builder to submit once the whole workflow. | ||||||||||
|
||||||||||
|
||||||||||
# This is our initial WorkGraph we want to parallelize | ||||||||||
@task.graph_builder( | ||||||||||
inputs=[{"name": "integer"}], outputs=[{"name": "sum", "from": "sum_task.result"}] | ||||||||||
) | ||||||||||
def add10(integer): | ||||||||||
wg = WorkGraph() | ||||||||||
code = load_code("add@localhost") # code needs to loaded in the graph builder | ||||||||||
add = wg.add_task( | ||||||||||
ArithmeticAddCalculation, name="sum_task", x=10, y=integer, code=code | ||||||||||
) | ||||||||||
add.set({"metadata.options.sleep": 3}) | ||||||||||
return wg | ||||||||||
|
||||||||||
print("State of WorkGraph: {}".format(wg.state)) | ||||||||||
|
||||||||||
# %% | ||||||||||
# Generate node graph from the AiiDA process: | ||||||||||
# | ||||||||||
|
||||||||||
from aiida_workgraph.utils import generate_node_graph | ||||||||||
wgs = [] | ||||||||||
tasks = [] | ||||||||||
for i in range(2): | ||||||||||
wg = WorkGraph(f"parallel_wg{i}") | ||||||||||
tasks.append(wg.add_task(add10, name="add10", integer=i)) | ||||||||||
wgs.append(wg) | ||||||||||
|
||||||||||
generate_node_graph(wg.pk) | ||||||||||
# We use wait=False so we can continue submitting | ||||||||||
wgs[0].submit() # do not wait (by default), so that we can continue to submit next WG. | ||||||||||
wgs[1].submit(wait=True) | ||||||||||
# we wait for all the WorkGraphs to finish | ||||||||||
wgs[0].wait() | ||||||||||
|
||||||||||
# %% | ||||||||||
# Second workflow: gather results | ||||||||||
# ================================ | ||||||||||
# Now I want to gather the results from the previous `multiply_parallel` tasks and calculate the sum of all their results. | ||||||||||
# Let's update the `multiply_parallel` function to `multiply_parallel_gather`. | ||||||||||
# | ||||||||||
# We print the difference between the mtime (the time the WorkGraph has been | ||||||||||
# last time changed) and the ctime (the time of creation). Since the | ||||||||||
# WorkGraph's status is changed when finished, this give us a good estimate of | ||||||||||
# the running time. | ||||||||||
print( | ||||||||||
"add10 task of WG0 created:", | ||||||||||
load_node(tasks[0].pk).ctime.time(), | ||||||||||
"finished:", | ||||||||||
load_node(tasks[0].pk).mtime.time(), | ||||||||||
) | ||||||||||
print( | ||||||||||
"add10 task of WG1 created:", | ||||||||||
load_node(tasks[1].pk).ctime.time(), | ||||||||||
"finished:", | ||||||||||
load_node(tasks[1].pk).mtime.time(), | ||||||||||
) | ||||||||||
|
||||||||||
|
||||||||||
# %% | ||||||||||
# Using graph builder | ||||||||||
# ------------------- | ||||||||||
|
||||||||||
|
||||||||||
@task.graph_builder(outputs=[{"name": "result", "from": "context.mul"}]) | ||||||||||
def multiply_parallel_gather(X, y): | ||||||||||
# This graph_builder runs the add10 over a loop and its | ||||||||||
@task.graph_builder() | ||||||||||
def parallel_add(nb_iterations): | ||||||||||
wg = WorkGraph() | ||||||||||
for key, value in X.items(): | ||||||||||
multiply1 = wg.add_task(multiply, x=value, y=y) | ||||||||||
# add result of multiply1 to `self.context.mul` | ||||||||||
# self.context.mul is a dict {"a": value1, "b": value2, "c": value3} | ||||||||||
multiply1.set_context({"result": f"mul.{key}"}) | ||||||||||
for i in range(nb_iterations): | ||||||||||
wg.add_task(add10, name=f"add10_{i}", integer=i) | ||||||||||
return wg | ||||||||||
|
||||||||||
|
||||||||||
@task.calcfunction() | ||||||||||
# the input is dynamic, we must use a variable kewword argument. **datas | ||||||||||
def sum(**datas): | ||||||||||
from aiida.orm import Float | ||||||||||
# Submitting a parallel that adds 10 two times to different numbers | ||||||||||
wg = WorkGraph(f"parallel_graph_builder") | ||||||||||
parallel_add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) | ||||||||||
wg.to_html() | ||||||||||
|
||||||||||
# %% | ||||||||||
wg.submit(wait=True) | ||||||||||
|
||||||||||
total = 0 | ||||||||||
for key, data in datas.items(): | ||||||||||
total += data | ||||||||||
return Float(total) | ||||||||||
# %% | ||||||||||
parallel_add_wg = WorkGraph.load(parallel_add_task.pk) | ||||||||||
add10_0_task = parallel_add_wg.tasks["add10_0"] | ||||||||||
add10_1_task = parallel_add_wg.tasks["add10_1"] | ||||||||||
print( | ||||||||||
"add10_0 task created:", | ||||||||||
add10_0_task.ctime.time(), | ||||||||||
"finished:", | ||||||||||
add10_0_task.mtime.time(), | ||||||||||
) | ||||||||||
print( | ||||||||||
"add10_1 task created:", | ||||||||||
add10_1_task.ctime.time(), | ||||||||||
"finished:", | ||||||||||
add10_1_task.mtime.time(), | ||||||||||
) | ||||||||||
|
||||||||||
# %% | ||||||||||
# We can see that the time is less than 6 seconds which means that the two additions | ||||||||||
# were performed in parallel | ||||||||||
|
||||||||||
# %% | ||||||||||
# Now, let's create a `WorkGraph` to use the new task: | ||||||||||
# | ||||||||||
# We can also look at the total time and see the overhead costs. | ||||||||||
print( | ||||||||||
"Time for running parallelized graph builder", | ||||||||||
parallel_add_task.mtime - parallel_add_task.ctime, | ||||||||||
) | ||||||||||
|
||||||||||
from aiida_workgraph import WorkGraph | ||||||||||
from aiida.orm import Int, List | ||||||||||
# %% | ||||||||||
# Increasing number of daemon workers | ||||||||||
# ----------------------------------- | ||||||||||
# Since each daemon worker can only manage one WorkGraph (handling the results) | ||||||||||
# at a time, one can experience slow downs when running many jobs that can be | ||||||||||
# run in parallel. The optimal number of workers depends highly on the jobs | ||||||||||
# that are run. | ||||||||||
|
||||||||||
X = {"a": Int(1), "b": Int(2), "c": Int(3)} | ||||||||||
y = Int(2) | ||||||||||
z = Int(3) | ||||||||||
wg = WorkGraph("parallel_tasks") | ||||||||||
multiply_parallel_gather1 = wg.add_task(multiply_parallel_gather, X=X, y=y) | ||||||||||
sum1 = wg.add_task(sum, name="sum1") | ||||||||||
# wg.add_link(add1.outputs[0], multiply_parallel_gather1.inputs["uuids"]) | ||||||||||
wg.add_link(multiply_parallel_gather1.outputs[0], sum1.inputs[0]) | ||||||||||
from aiida.engine.daemon.client import get_daemon_client | ||||||||||
|
||||||||||
wg.submit(wait=True) | ||||||||||
client = get_daemon_client() | ||||||||||
print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}") | ||||||||||
|
||||||||||
# %% | ||||||||||
# Get the result of the tasks: | ||||||||||
# | ||||||||||
# We rerun the last graph builder with 2 damon workers | ||||||||||
|
||||||||||
print("State of WorkGraph: {}".format(wg.state)) | ||||||||||
print("Result of task add1: {}".format(wg.tasks["sum1"].outputs["result"].value)) | ||||||||||
client.increase_workers(1) | ||||||||||
print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}") | ||||||||||
wg = WorkGraph("wg_daemon_worker_2") | ||||||||||
parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) | ||||||||||
wg.to_html() | ||||||||||
|
||||||||||
# %% | ||||||||||
wg.submit(wait=True) | ||||||||||
print( | ||||||||||
"Time for running parallelized graph builder with 2 daemons", | ||||||||||
parallel_add_task_2.mtime - parallel_add_task_2.ctime, | ||||||||||
) | ||||||||||
|
||||||||||
# %% | ||||||||||
# Generate node graph from the AiiDA process: | ||||||||||
# | ||||||||||
# The overhead time has shortens a bit as the handling of the CalcJobs and | ||||||||||
# WorkGraphs could be parallelized. One can increase the number of iterations | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure on Looking at the time, there is no improvement.
|
||||||||||
# to see a more significant difference. | ||||||||||
|
||||||||||
from aiida_workgraph.utils import generate_node_graph | ||||||||||
|
||||||||||
generate_node_graph(wg.pk) | ||||||||||
# %% | ||||||||||
# Reset back to one worker | ||||||||||
client.decrease_workers(1) | ||||||||||
|
||||||||||
# %% | ||||||||||
# You can see that the outputs of `multiply_parallel_gather` workgraph is linked to the input of the `sum` task. | ||||||||||
# Maximum number of active WorkGraphs | ||||||||||
# ----------------------------------- | ||||||||||
# Be aware that for the moment AiiDA can only run 200 WorkGraphs at the same time. | ||||||||||
# To increase that limit one can set this variable to a higher value. | ||||||||||
Comment on lines
+279
to
+280
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
# | ||||||||||
# .. code-block:: bash | ||||||||||
# | ||||||||||
# verdi config set daemon.worker_process_slots 200 | ||||||||||
# verdi daemon restart | ||||||||||
|
||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to add a link to the performance page. |
||||||||||
|
||||||||||
# %% | ||||||||||
# Further reading | ||||||||||
# --------------- | ||||||||||
# Now you learned how to run tasks in parallel you might want to know how to | ||||||||||
# aggregate the results of all these parallel tasks (e.g. taking the mean of | ||||||||||
# all computed values). For this you can further read `how to aggregate outputs <aggregate.html>`_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.