Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework parallel doc example using CalcJob's #288

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 219 additions & 87 deletions docs/gallery/howto/autogen/parallel.py
Original file line number Diff line number Diff line change
@@ -1,148 +1,280 @@
"""
=======================
=====================
Run tasks in parallel
=======================
=====================
"""
# %%
# Introduction
# ============
# In this tutorial, you will learn how to run task in parallel.
#
# Load the AiiDA profile.
#

# In this tutorial, you will learn how to run tasks and WorkGraphs in parallel.
# When defining the dependencies WorkGraph by linking tasks the WorkGraph
# engine will automatically take care of parallelizing the independent tasks. One
# caveat is that we cannot use calcfunctions for this purpose as they all run
# in the same runner environment and therefore are blocking each other. For
# that reason we need to use `CalcJob`s that can be run in different runner
# environments and therefore can be run in parallel.

# Load the AiiDA profile.
from aiida import load_profile

load_profile()


# %%
# First workflow
# ==============
# Suppose we want to calculate ```(x + y) * z ``` in two steps. First, add `x` and `y`, then multiply the result with `z`. And `X` is a list of values. We want to calculate these in parallel.
#
# Create task
# ------------
# First, one should know that we can not launch a subprocess inside a `task` or a `calcfunction`. We need a create a `WorkGraph` to run tasksin parallel. And then treat this `WorkGraph` as a task.
#
# Parallel addition workflow
# ==========================
# Suppose we want to calculate ```x + y + u + v``` in a parallel, instead of
# computing sequentially ```(((x + y) + u) + v)``` we compute it like
# ```((x + y) + (u + v))``` to compute ```x + y``` and ```u + v``` in parallel.
# aiida-core already provides a ArithmeticAddCalculation CalcJob for performing
# addition which we will use it for this example

from aiida_workgraph import WorkGraph, task
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
from aiida.orm import InstalledCode, load_computer, load_code, load_node
from aiida.common.exceptions import NotExistent

# The ArithmeticAddCalculation needs to know where bash is stored
try:
code = load_code("add@localhost") # The computer label can also be omitted here
except NotExistent:
code = InstalledCode(
computer=load_computer("localhost"),
filepath_executable="/bin/bash",
label="add",
default_calc_job_plugin="core.arithmetic.add",
).store()

wg = WorkGraph("parallel")
x, y, u, v = (1, 2, 3, 4)
add_xy = wg.add_task(ArithmeticAddCalculation, name="add_xy", x=x, y=y, code=code)
add_xy.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds
add_uv = wg.add_task(ArithmeticAddCalculation, name="add_uv", x=u, y=v, code=code)
add_uv.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds
add_xyuv = wg.add_task(
ArithmeticAddCalculation,
name="add_xyuv",
x=add_xy.outputs["sum"],
y=add_uv.outputs["sum"],
code=code,
)
# %%
# We can verify that the tasks add_xy and add_uv are independent from each other
# and therefore will be run automatically in parallel.

wg.to_html()

from aiida_workgraph import task, WorkGraph

# define multiply task
@task.calcfunction()
def multiply(x, y):
return x * y
# %%
# Running workgraph

wg.submit(wait=True)

# Create a WorkGraph as a task
@task.graph_builder()
def multiply_parallel(X, y):
wg = WorkGraph()
# here the task `multiply` is created and will run in parallel
for key, value in X.items():
wg.add_task(multiply, name=f"multiply_{key}", x=value, y=y)
return wg
# %%
# We look at the ctime (the time of creation when submitted/run) and the mtime (the time the task has been last modified which is when its state changes to finish).
print("add_xy created at:", add_xy.ctime.time(), "finished at:", add_xy.mtime.time())
print("add_uv created at:", add_uv.ctime.time(), "finished at:", add_uv.mtime.time())

# %%
# We can see that both CalcJob's have been created almost at the same time

# %%
# Create the workflow
# ---------------------
# Comparison with a calcfunction
# ------------------------------
#

from aiida_workgraph import WorkGraph
from aiida.orm import Int, List

X = {"a": Int(1), "b": Int(2), "c": Int(3)}
y = Int(2)
z = Int(3)
wg = WorkGraph("parallel_tasks")
multiply_parallel1 = wg.add_task(multiply_parallel, name="multiply_parallel1", X=X, y=y)
@task.calcfunction()
def add(x, y, sleep):
import time

time.sleep(sleep.value)
return x + y


wg = WorkGraph("parallel")
x, y, u, v = (1, 2, 3, 4)
add_xy = wg.add_task(add, x=x, y=y, sleep=3)
add_uv = wg.add_task(add, x=x, y=y, sleep=3)
add_xyuv = wg.add_task(
add, x=add_xy.outputs["result"], y=add_uv.outputs["result"], sleep=0
)

wg.to_html()

# %%

wg.submit(wait=True)

# %%
# Printing timings

print("add_xy created at", add_xy.ctime.time(), "finished at", add_xy.mtime.time())
print("add_uv created at", add_uv.ctime.time(), "finished at", add_uv.mtime.time())

# %%
# Check the status and results
# -----------------------------
#
# We can see that the calcfunctions have been run with a 3 seconds delay


# %%
# Parallelizing WorkGraphs
# ========================
# We will parallelize a workgraph by two ways, one time we submit all workgraphs,
# the other time we use the graph builder to submit once the whole workflow.


# This is our initial WorkGraph we want to parallelize
@task.graph_builder(
inputs=[{"name": "integer"}], outputs=[{"name": "sum", "from": "sum_task.result"}]
)
def add10(integer):
wg = WorkGraph()
code = load_code("add@localhost") # code needs to loaded in the graph builder
add = wg.add_task(
ArithmeticAddCalculation, name="sum_task", x=10, y=integer, code=code
)
add.set({"metadata.options.sleep": 3})
return wg

print("State of WorkGraph: {}".format(wg.state))

# %%
# Generate node graph from the AiiDA process:
#

from aiida_workgraph.utils import generate_node_graph
wgs = []
tasks = []
for i in range(2):
wg = WorkGraph(f"parallel_wg{i}")
tasks.append(wg.add_task(add10, name="add10", integer=i))
wgs.append(wg)

generate_node_graph(wg.pk)
# We use wait=False so we can continue submitting
wgs[0].submit() # do not wait (by default), so that we can continue to submit next WG.
wgs[1].submit(wait=True)
# we wait for all the WorkGraphs to finish
wgs[0].wait()

# %%
# Second workflow: gather results
# ================================
# Now I want to gather the results from the previous `multiply_parallel` tasks and calculate the sum of all their results.
# Let's update the `multiply_parallel` function to `multiply_parallel_gather`.
#
# We print the difference between the mtime (the time the WorkGraph has been
# last time changed) and the ctime (the time of creation). Since the
# WorkGraph's status is changed when finished, this give us a good estimate of
# the running time.
print(
"add10 task of WG0 created:",
load_node(tasks[0].pk).ctime.time(),
"finished:",
load_node(tasks[0].pk).mtime.time(),
)
print(
"add10 task of WG1 created:",
load_node(tasks[1].pk).ctime.time(),
"finished:",
load_node(tasks[1].pk).mtime.time(),
)


# %%
# Using graph builder
# -------------------


@task.graph_builder(outputs=[{"name": "result", "from": "context.mul"}])
def multiply_parallel_gather(X, y):
# This graph_builder runs the add10 over a loop and its
@task.graph_builder()
def parallel_add(nb_iterations):
wg = WorkGraph()
for key, value in X.items():
multiply1 = wg.add_task(multiply, x=value, y=y)
# add result of multiply1 to `self.context.mul`
# self.context.mul is a dict {"a": value1, "b": value2, "c": value3}
multiply1.set_context({"result": f"mul.{key}"})
for i in range(nb_iterations):
wg.add_task(add10, name=f"add10_{i}", integer=i)
return wg


@task.calcfunction()
# the input is dynamic, we must use a variable kewword argument. **datas
def sum(**datas):
from aiida.orm import Float
# Submitting a parallel that adds 10 two times to different numbers
wg = WorkGraph(f"parallel_graph_builder")
parallel_add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2)
wg.to_html()

# %%
wg.submit(wait=True)

total = 0
for key, data in datas.items():
total += data
return Float(total)
# %%
parallel_add_wg = WorkGraph.load(parallel_add_task.pk)
add10_0_task = parallel_add_wg.tasks["add10_0"]
add10_1_task = parallel_add_wg.tasks["add10_1"]
print(
"add10_0 task created:",
add10_0_task.ctime.time(),
"finished:",
add10_0_task.mtime.time(),
)
print(
"add10_1 task created:",
add10_1_task.ctime.time(),
"finished:",
add10_1_task.mtime.time(),
)

# %%
# We can see that the time is less than 6 seconds which means that the two additions
# were performed in parallel

# %%
# Now, let's create a `WorkGraph` to use the new task:
#
# We can also look at the total time and see the overhead costs.
print(
"Time for running parallelized graph builder",
parallel_add_task.mtime - parallel_add_task.ctime,
)

from aiida_workgraph import WorkGraph
from aiida.orm import Int, List
# %%
# Increasing number of daemon workers
# -----------------------------------
# Since each daemon worker can only manage one WorkGraph (handling the results)
# at a time, one can experience slow downs when running many jobs that can be
# run in parallel. The optimal number of workers depends highly on the jobs
Comment on lines +220 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Since each daemon worker can only manage one WorkGraph (handling the results)
# at a time, one can experience slow downs when running many jobs that can be
# run in parallel. The optimal number of workers depends highly on the jobs
# One can experience slow downs when running many jobs (e.g., 100 jobs) that can be
# run in parallel. The optimal number of workers depends highly on the jobs

# that are run.

X = {"a": Int(1), "b": Int(2), "c": Int(3)}
y = Int(2)
z = Int(3)
wg = WorkGraph("parallel_tasks")
multiply_parallel_gather1 = wg.add_task(multiply_parallel_gather, X=X, y=y)
sum1 = wg.add_task(sum, name="sum1")
# wg.add_link(add1.outputs[0], multiply_parallel_gather1.inputs["uuids"])
wg.add_link(multiply_parallel_gather1.outputs[0], sum1.inputs[0])
from aiida.engine.daemon.client import get_daemon_client

wg.submit(wait=True)
client = get_daemon_client()
print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}")

# %%
# Get the result of the tasks:
#
# We rerun the last graph builder with 2 damon workers

print("State of WorkGraph: {}".format(wg.state))
print("Result of task add1: {}".format(wg.tasks["sum1"].outputs["result"].value))
client.increase_workers(1)
print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}")
wg = WorkGraph("wg_daemon_worker_2")
parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2)
wg.to_html()

# %%
wg.submit(wait=True)
print(
"Time for running parallelized graph builder with 2 daemons",
parallel_add_task_2.mtime - parallel_add_task_2.ctime,
)

# %%
# Generate node graph from the AiiDA process:
#
# The overhead time has shortens a bit as the handling of the CalcJobs and
# WorkGraphs could be parallelized. One can increase the number of iterations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure on he overhead time has shortens a bit?

Looking at the time, there is no improvement.

Time for running parallelized graph builder 0:00:11.262496
Time for running parallelized graph builder with 2 daemons 0:00:11.264958

# to see a more significant difference.

from aiida_workgraph.utils import generate_node_graph

generate_node_graph(wg.pk)
# %%
# Reset back to one worker
client.decrease_workers(1)

# %%
# You can see that the outputs of `multiply_parallel_gather` workgraph is linked to the input of the `sum` task.
# Maximum number of active WorkGraphs
# -----------------------------------
# Be aware that for the moment AiiDA can only run 200 WorkGraphs at the same time.
# To increase that limit one can set this variable to a higher value.
Comment on lines +279 to +280
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Be aware that for the moment AiiDA can only run 200 WorkGraphs at the same time.
# To increase that limit one can set this variable to a higher value.
# Be aware that for the moment, AiiDA can only run 200 processes (WorkGraph, CalcJob etc) at the same time.
# To increase that limit, one can set this variable to a higher value.

#
# .. code-block:: bash
#
# verdi config set daemon.worker_process_slots 200
# verdi daemon restart

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add a link to the performance page.


# %%
# Further reading
# ---------------
# Now you learned how to run tasks in parallel you might want to know how to
# aggregate the results of all these parallel tasks (e.g. taking the mean of
# all computed values). For this you can further read `how to aggregate outputs <aggregate.html>`_
Loading