From 6792713d81f4c8531fca11aa56a2e988eccbf537 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Mon, 2 Sep 2024 09:40:20 +0200 Subject: [PATCH 01/20] Rework parallel doc example using CalcJob's The previous example did rely on calcfunctions that are always run sequentially. This example now uses CalcJobs to actually achieve parallel executions. --- docs/gallery/howto/autogen/parallel.py | 273 +++++++++++++++++-------- 1 file changed, 185 insertions(+), 88 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel.py b/docs/gallery/howto/autogen/parallel.py index 97ce88a3..57305232 100644 --- a/docs/gallery/howto/autogen/parallel.py +++ b/docs/gallery/howto/autogen/parallel.py @@ -1,148 +1,245 @@ """ -======================= +===================== Run tasks in parallel -======================= +===================== """ # %% # Introduction # ============ -# In this tutorial, you will learn how to run task in parallel. -# -# Load the AiiDA profile. -# - +# In this tutorial, you will learn how to run tasks and WorkGraphs in parallel. +# When defining the dependencies WorkGraph by linking tasks the WorkGraph +# engine will automatically take care of parallelizing the independent tasks. One +# caveat is that we cannot use calcfunctions for this purpose as they all run +# in the same runner environment and therefore are blocking each other. For +# that reason we need to use `CalcJob`s that can be run in different runner +# environments and therefore can be run in parallel. +# Load the AiiDA profile. from aiida import load_profile load_profile() # %% -# First workflow -# ============== -# Suppose we want to calculate ```(x + y) * z ``` in two steps. First, add `x` and `y`, then multiply the result with `z`. And `X` is a list of values. We want to calculate these in parallel. -# -# Create task -# ------------ -# First, one should know that we can not launch a subprocess inside a `task` or a `calcfunction`. We need a create a `WorkGraph` to run tasksin parallel. And then treat this `WorkGraph` as a task. -# +# Parallel addition workflow +# ========================== +# Suppose we want to calculate ```x + y + u + v``` in a parallel, instead of +# computing sequentially ```(((x + y) + u) + v)``` we compute it like +# ```((x + y) + (u + v))``` to compute ```x + y``` and ```u + v``` in parallel. +# aiida-core already provides a ArithmeticAddCalculation CalcJob for performing +# addition which we will use it for this example + +from aiida_workgraph import WorkGraph, task +from aiida.calculations.arithmetic.add import ArithmeticAddCalculation +from aiida.orm import Int, InstalledCode, load_computer, load_code, load_node +from aiida.common.exceptions import NotExistent + +# The ArithmeticAddCalculation needs to know where bash is stored +try: + code = load_code("add@localhost") # The computer label can also be omitted here +except NotExistent: + code = InstalledCode( + computer=load_computer("localhost"), + filepath_executable="/bin/bash", + label="add", + default_calc_job_plugin="core.arithmetic.add", + ).store() + +wg = WorkGraph("parallel") +x, y, u, v = (1, 2, 3, 4) +add_xy = wg.add_task(ArithmeticAddCalculation, x=x, y=y, code=code) +add_xy.set({"metadata.options.sleep": 5}) # the CalcJob will sleep 5 seconds +add_uv = wg.add_task(ArithmeticAddCalculation, x=u, y=v, code=code) +add_uv.set({"metadata.options.sleep": 5}) # the CalcJob will sleep 5 seconds +add_xyuv = wg.add_task( + ArithmeticAddCalculation, + x=add_xy.outputs["sum"], + y=add_uv.outputs["sum"], + code=code, +) +wg.submit(wait=True) + +# %% +# We look at the ctime (the time of creation when submitted/run) and the mtime (the time the task has been last modified which is when its state changes to finish). +print("add_xy created at:", add_xy.ctime.time(), "finished at:", add_xy.mtime.time()) +print("add_uv created at:", add_uv.ctime.time(), "finished at:", add_uv.mtime.time()) +# %% +# We can see that both CalcJob's have been created almost at the same time + +# %% +# Comparison with a calcfunction +# ------------------------------ +# -from aiida_workgraph import task, WorkGraph -# define multiply task @task.calcfunction() -def multiply(x, y): - return x * y +def add(x, y, sleep): + import time + time.sleep(sleep.value) + return x + y -# Create a WorkGraph as a task -@task.graph_builder() -def multiply_parallel(X, y): - wg = WorkGraph() - # here the task `multiply` is created and will run in parallel - for key, value in X.items(): - wg.add_task(multiply, name=f"multiply_{key}", x=value, y=y) - return wg +wg = WorkGraph("parallel") +x, y, u, v = (1, 2, 3, 4) +add_xy = wg.add_task(add, x=x, y=y, sleep=5) +add_uv = wg.add_task(add, x=x, y=y, sleep=5) +add_xyuv = wg.add_task( + add, x=add_xy.outputs["result"], y=add_uv.outputs["result"], sleep=0 +) -# %% -# Create the workflow -# --------------------- -# +wg.submit(wait=True) -from aiida_workgraph import WorkGraph -from aiida.orm import Int, List +# %% +# Printing timings -X = {"a": Int(1), "b": Int(2), "c": Int(3)} -y = Int(2) -z = Int(3) -wg = WorkGraph("parallel_tasks") -multiply_parallel1 = wg.add_task(multiply_parallel, name="multiply_parallel1", X=X, y=y) +print("add_xy created at", add_xy.ctime.time(), "finished at", add_xy.mtime.time()) +print("add_uv created at", add_uv.ctime.time(), "finished at", add_uv.mtime.time()) -wg.submit(wait=True) +# %% +# We can see that the calcfunctions have been run with a 5 seconds delay # %% -# Check the status and results -# ----------------------------- -# +# Parallelizing WorkGraphs +# ======================== +# We will parallelize a workgraph by two ways, one time we submit all workgraphs, +# the other time we use the graph builder to submit once the whole workflow. -print("State of WorkGraph: {}".format(wg.state)) +# This is our initial WorkGraph we want to parallelize +@task.graph_builder( + inputs=[{"name": "integer"}], outputs=[{"name": "sum", "from": "sum_task.result"}] +) +def add10_wg(integer): + wg = WorkGraph() + code = load_code("add@localhost") # code needs to loaded in the graph builder + add = wg.add_task( + ArithmeticAddCalculation, name="sum_task", x=10, y=integer, code=code + ) + add.set({"metadata.options.sleep": 5}) + return wg + # %% -# Generate node graph from the AiiDA process: -# -from aiida_workgraph.utils import generate_node_graph +wgs = [] +for i in range(2): + wg = WorkGraph(f"parallel_wg{i}") + wg.add_task(add10_wg, name=f"add10_{i}", integer=i) + wgs.append(wg) -generate_node_graph(wg.pk) +# We use wait=False so we can continue submitting +wgs[0].submit(wait=False) +# we wait for the last WorkGraph to finish +wgs[1].submit(wait=True) # %% -# Second workflow: gather results -# ================================ -# Now I want to gather the results from the previous `multiply_parallel` tasks and calculate the sum of all their results. -# Let's update the `multiply_parallel` function to `multiply_parallel_gather`. -# +# We print the difference between the mtime (the time the WorkGraph has been last time changed) and the ctime (the time of creation). Since the WorkGraph's status is changed when finished, this give us a good estimate of the running time. +print( + "WG0 created:", + load_node(wgs[0].pk).ctime.time(), + "finished:", + load_node(wgs[0].pk).mtime.time(), +) +print("Time WG0", load_node(wgs[0].pk).mtime - load_node(wgs[0].pk).ctime) +print( + "WG1 created:", + load_node(wgs[1].pk).ctime.time(), + "finished:", + load_node(wgs[1].pk).mtime.time(), +) +print("Time WG1", load_node(wgs[1].pk).mtime - load_node(wgs[1].pk).ctime) + + +# %% +# Using graph builder +# ------------------- -@task.graph_builder(outputs=[{"name": "result", "from": "context.mul"}]) -def multiply_parallel_gather(X, y): +# This graph_builder runs the add10_wg over a loop and its +@task.graph_builder() +def parallel_add(nb_it): wg = WorkGraph() - for key, value in X.items(): - multiply1 = wg.add_task(multiply, x=value, y=y) - # add result of multiply1 to `self.context.mul` - # self.context.mul is a dict {"a": value1, "b": value2, "c": value3} - multiply1.set_context({"result": f"mul.{key}"}) + for i in range(nb_it): + wg.add_task(add10_wg, name=f"add10_{i}", integer=i) return wg -@task.calcfunction() -# the input is dynamic, we must use a variable kewword argument. **datas -def sum(**datas): - from aiida.orm import Float +# Submitting a parallel that adds 10 two times to different numbers +wg = WorkGraph(f"parallel_graph_builder") +add_task = wg.add_task(parallel_add, name="parallel_add", nb_it=2) +wg.submit(wait=True) - total = 0 - for key, data in datas.items(): - total += data - return Float(total) +# %% +# We look at the times of creation and last change +print("Time for running with graph builder", add_task.mtime - add_task.ctime) # %% -# Now, let's create a `WorkGraph` to use the new task: -# +# We can see that the time is more than 5 seconds which means that the two additions +# were performed in parallel -from aiida_workgraph import WorkGraph -from aiida.orm import Int, List +# %% +# Increasing number of daemon workers +# ----------------------------------- +# Since each daemon worker can only manage one WorkGraph (handling the results) +# at a time, one can experience slow downs when running many jobs that can be +# run in parallel. The optimal number of workers depends highly on the jobs +# that are run. + +from aiida.engine.daemon.client import get_daemon_client + +client = get_daemon_client() -X = {"a": Int(1), "b": Int(2), "c": Int(3)} -y = Int(2) -z = Int(3) -wg = WorkGraph("parallel_tasks") -multiply_parallel_gather1 = wg.add_task(multiply_parallel_gather, X=X, y=y) -sum1 = wg.add_task(sum, name="sum1") -# wg.add_link(add1.outputs[0], multiply_parallel_gather1.inputs["uuids"]) -wg.add_link(multiply_parallel_gather1.outputs[0], sum1.inputs[0]) +# %% +# We rerun the last graph builder for 5 iterations +wg = WorkGraph("wg_daemon_worker_1") +wg.add_task(parallel_add, name="parallel_add", nb_it=5) wg.submit(wait=True) +print( + f"Time for running with {client.get_numprocesses()['numprocesses']} worker", + load_node(wg.pk).mtime - load_node(wg.pk).ctime, +) # %% -# Get the result of the tasks: -# +# We increase the number of workers by one. One can also do this in the workgraph GUI. + +client = get_daemon_client() +client.increase_workers(1) -print("State of WorkGraph: {}".format(wg.state)) -print("Result of task add1: {}".format(wg.tasks["sum1"].outputs["result"].value)) +# %% +# Now we submit again and the time have shortens a bit. +wg = WorkGraph("wg_daemon_worker_2") +wg.add_task(parallel_add, name="parallel_add", nb_it=5) +wg.submit(wait=True) +print( + f"Time for running with {client.get_numprocesses()['numprocesses']} worker", + load_node(wg.pk).mtime - load_node(wg.pk).ctime, +) # %% -# Generate node graph from the AiiDA process: -# +# Note that on readthedocs you will not see a big difference due to the hardware. +# With a limited number of CPU the workers cannot be parallelized -from aiida_workgraph.utils import generate_node_graph +import multiprocessing -generate_node_graph(wg.pk) +print("Number of CPUs", multiprocessing.cpu_count()) # %% -# You can see that the outputs of `multiply_parallel_gather` workgraph is linked to the input of the `sum` task. +# Reset back to one worker +client.decrease_workers(1) + +# %% +# Maximum number of active WorkGraphs +# ----------------------------------- +# Be aware that for the moment AiiDA can only run 200 WorkGraphs at the same time. +# To increase that limit one can set this variable to a higher value. +# +# .. code-block:: bash # +# verdi config set daemon.worker_process_slots 200 +# verdi daemon restart From 172122657c18fc3d926b55293862f346f58f5bc3 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 10 Sep 2024 10:46:18 +0200 Subject: [PATCH 02/20] enforce rebuilding of examples in RTD --- docs/source/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/conf.py b/docs/source/conf.py index 9c923434..462179fe 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -71,6 +71,7 @@ "filename_pattern": "/*", "examples_dirs": gallery_src_dirs, # in sphinx-gallery doc referred as gallery source "gallery_dirs": sphinx_src_autogen_dirs, # path to where to gallery puts generated files + "run_stale_examples": True } exclude_patterns = [] From ef5af103d23d83f98e47e752ed7792b0d8e262d6 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 10 Sep 2024 10:55:27 +0200 Subject: [PATCH 03/20] format --- docs/source/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 462179fe..a1d20e81 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -71,7 +71,7 @@ "filename_pattern": "/*", "examples_dirs": gallery_src_dirs, # in sphinx-gallery doc referred as gallery source "gallery_dirs": sphinx_src_autogen_dirs, # path to where to gallery puts generated files - "run_stale_examples": True + "run_stale_examples": True, } exclude_patterns = [] From 7ebea269c7cbe169b7cc1a556ec962dcadb5e497 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 10 Sep 2024 11:13:18 +0200 Subject: [PATCH 04/20] rename to invalidated some cache --- docs/gallery/howto/autogen/{parallel.py => parallel_wf.py} | 0 docs/source/howto/index.rst | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename docs/gallery/howto/autogen/{parallel.py => parallel_wf.py} (100%) diff --git a/docs/gallery/howto/autogen/parallel.py b/docs/gallery/howto/autogen/parallel_wf.py similarity index 100% rename from docs/gallery/howto/autogen/parallel.py rename to docs/gallery/howto/autogen/parallel_wf.py diff --git a/docs/source/howto/index.rst b/docs/source/howto/index.rst index 274f74be..6d33bfb2 100644 --- a/docs/source/howto/index.rst +++ b/docs/source/howto/index.rst @@ -9,7 +9,7 @@ This section contains a collection of HowTos for various topics. :caption: Contents: autogen/graph_builder - autogen/parallel + autogen/parallel_wf if while context From a067e68191d37bdcd00fd96c0d2ada914e31c8d2 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 13:57:02 +0200 Subject: [PATCH 05/20] Apply suggestions from code review Co-authored-by: Xing Wang --- docs/gallery/howto/autogen/parallel_wf.py | 17 +++++++++-------- docs/source/conf.py | 1 - 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 57305232..19fb79ec 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -131,9 +131,10 @@ def add10_wg(integer): wgs.append(wg) # We use wait=False so we can continue submitting -wgs[0].submit(wait=False) -# we wait for the last WorkGraph to finish +wgs[0].submit() # do not wait (by default), so that we can continue to submit next WG. wgs[1].submit(wait=True) +# we wait for all the WorkGraphs to finish +wgs[0].wait() # %% # We print the difference between the mtime (the time the WorkGraph has been last time changed) and the ctime (the time of creation). Since the WorkGraph's status is changed when finished, this give us a good estimate of the running time. @@ -160,16 +161,16 @@ def add10_wg(integer): # This graph_builder runs the add10_wg over a loop and its @task.graph_builder() -def parallel_add(nb_it): +def parallel_add(nb_iterations): wg = WorkGraph() - for i in range(nb_it): + for i in range(nb_iterations): wg.add_task(add10_wg, name=f"add10_{i}", integer=i) return wg # Submitting a parallel that adds 10 two times to different numbers wg = WorkGraph(f"parallel_graph_builder") -add_task = wg.add_task(parallel_add, name="parallel_add", nb_it=2) +add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) wg.submit(wait=True) @@ -178,7 +179,7 @@ def parallel_add(nb_it): print("Time for running with graph builder", add_task.mtime - add_task.ctime) # %% -# We can see that the time is more than 5 seconds which means that the two additions +# We can see that the time is less than 10 seconds which means that the two additions # were performed in parallel # %% @@ -197,7 +198,7 @@ def parallel_add(nb_it): # We rerun the last graph builder for 5 iterations wg = WorkGraph("wg_daemon_worker_1") -wg.add_task(parallel_add, name="parallel_add", nb_it=5) +wg.add_task(parallel_add, name="parallel_add", nb_iterations=5) wg.submit(wait=True) print( f"Time for running with {client.get_numprocesses()['numprocesses']} worker", @@ -214,7 +215,7 @@ def parallel_add(nb_it): # Now we submit again and the time have shortens a bit. wg = WorkGraph("wg_daemon_worker_2") -wg.add_task(parallel_add, name="parallel_add", nb_it=5) +wg.add_task(parallel_add, name="parallel_add", nb_iterations=5) wg.submit(wait=True) print( f"Time for running with {client.get_numprocesses()['numprocesses']} worker", diff --git a/docs/source/conf.py b/docs/source/conf.py index a1d20e81..9c923434 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -71,7 +71,6 @@ "filename_pattern": "/*", "examples_dirs": gallery_src_dirs, # in sphinx-gallery doc referred as gallery source "gallery_dirs": sphinx_src_autogen_dirs, # path to where to gallery puts generated files - "run_stale_examples": True, } exclude_patterns = [] From fb86a6cf9941f52557d1283a08546ca77b22415f Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 14:16:52 +0200 Subject: [PATCH 06/20] add GUI representation to emphasize the fact that independent tasks are run in parallel --- docs/gallery/howto/autogen/parallel_wf.py | 31 ++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 19fb79ec..92bd24fb 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -31,7 +31,7 @@ from aiida_workgraph import WorkGraph, task from aiida.calculations.arithmetic.add import ArithmeticAddCalculation -from aiida.orm import Int, InstalledCode, load_computer, load_code, load_node +from aiida.orm import InstalledCode, load_computer, load_code, load_node from aiida.common.exceptions import NotExistent # The ArithmeticAddCalculation needs to know where bash is stored @@ -47,16 +47,27 @@ wg = WorkGraph("parallel") x, y, u, v = (1, 2, 3, 4) -add_xy = wg.add_task(ArithmeticAddCalculation, x=x, y=y, code=code) +add_xy = wg.add_task(ArithmeticAddCalculation, name="add_xy", x=x, y=y, code=code) add_xy.set({"metadata.options.sleep": 5}) # the CalcJob will sleep 5 seconds -add_uv = wg.add_task(ArithmeticAddCalculation, x=u, y=v, code=code) +add_uv = wg.add_task(ArithmeticAddCalculation, name="add_uv", x=u, y=v, code=code) add_uv.set({"metadata.options.sleep": 5}) # the CalcJob will sleep 5 seconds add_xyuv = wg.add_task( ArithmeticAddCalculation, + name="add_xyuv", x=add_xy.outputs["sum"], y=add_uv.outputs["sum"], code=code, ) +# %% +# We can verify that the tasks add_xy and add_uv are independent from each other +# and therefore will be run automatically in parallel. + +wg.to_html() + + +# %% +# Running workgraph + wg.submit(wait=True) # %% @@ -89,6 +100,10 @@ def add(x, y, sleep): add, x=add_xy.outputs["result"], y=add_uv.outputs["result"], sleep=0 ) +wg.to_html() + +# %% + wg.submit(wait=True) # %% @@ -171,6 +186,9 @@ def parallel_add(nb_iterations): # Submitting a parallel that adds 10 two times to different numbers wg = WorkGraph(f"parallel_graph_builder") add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) +wg.to_html() + +# %% wg.submit(wait=True) @@ -199,6 +217,9 @@ def parallel_add(nb_iterations): wg = WorkGraph("wg_daemon_worker_1") wg.add_task(parallel_add, name="parallel_add", nb_iterations=5) +wg.to_html() + +# %% wg.submit(wait=True) print( f"Time for running with {client.get_numprocesses()['numprocesses']} worker", @@ -216,6 +237,10 @@ def parallel_add(nb_iterations): wg = WorkGraph("wg_daemon_worker_2") wg.add_task(parallel_add, name="parallel_add", nb_iterations=5) +wg.to_html() + +# %% + wg.submit(wait=True) print( f"Time for running with {client.get_numprocesses()['numprocesses']} worker", From 9257e25977b00d6cee8115920b7e9bb551f254fe Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 14:23:17 +0200 Subject: [PATCH 07/20] reduce sleep time and number of iterations to reduce running time --- docs/gallery/howto/autogen/parallel_wf.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 92bd24fb..184b804e 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -48,9 +48,9 @@ wg = WorkGraph("parallel") x, y, u, v = (1, 2, 3, 4) add_xy = wg.add_task(ArithmeticAddCalculation, name="add_xy", x=x, y=y, code=code) -add_xy.set({"metadata.options.sleep": 5}) # the CalcJob will sleep 5 seconds +add_xy.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds add_uv = wg.add_task(ArithmeticAddCalculation, name="add_uv", x=u, y=v, code=code) -add_uv.set({"metadata.options.sleep": 5}) # the CalcJob will sleep 5 seconds +add_uv.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds add_xyuv = wg.add_task( ArithmeticAddCalculation, name="add_xyuv", @@ -94,8 +94,8 @@ def add(x, y, sleep): wg = WorkGraph("parallel") x, y, u, v = (1, 2, 3, 4) -add_xy = wg.add_task(add, x=x, y=y, sleep=5) -add_uv = wg.add_task(add, x=x, y=y, sleep=5) +add_xy = wg.add_task(add, x=x, y=y, sleep=3) +add_uv = wg.add_task(add, x=x, y=y, sleep=3) add_xyuv = wg.add_task( add, x=add_xy.outputs["result"], y=add_uv.outputs["result"], sleep=0 ) @@ -113,7 +113,7 @@ def add(x, y, sleep): print("add_uv created at", add_uv.ctime.time(), "finished at", add_uv.mtime.time()) # %% -# We can see that the calcfunctions have been run with a 5 seconds delay +# We can see that the calcfunctions have been run with a 3 seconds delay # %% @@ -133,7 +133,7 @@ def add10_wg(integer): add = wg.add_task( ArithmeticAddCalculation, name="sum_task", x=10, y=integer, code=code ) - add.set({"metadata.options.sleep": 5}) + add.set({"metadata.options.sleep": 3}) return wg @@ -197,7 +197,7 @@ def parallel_add(nb_iterations): print("Time for running with graph builder", add_task.mtime - add_task.ctime) # %% -# We can see that the time is less than 10 seconds which means that the two additions +# We can see that the time is less than 6 seconds which means that the two additions # were performed in parallel # %% @@ -213,10 +213,10 @@ def parallel_add(nb_iterations): client = get_daemon_client() # %% -# We rerun the last graph builder for 5 iterations +# We rerun the last graph builder for 3 iterations wg = WorkGraph("wg_daemon_worker_1") -wg.add_task(parallel_add, name="parallel_add", nb_iterations=5) +wg.add_task(parallel_add, name="parallel_add", nb_iterations=3) wg.to_html() # %% @@ -236,7 +236,7 @@ def parallel_add(nb_iterations): # Now we submit again and the time have shortens a bit. wg = WorkGraph("wg_daemon_worker_2") -wg.add_task(parallel_add, name="parallel_add", nb_iterations=5) +wg.add_task(parallel_add, name="parallel_add", nb_iterations=3) wg.to_html() # %% From 7d567def829c151babb132ca2973fe116f1e8335 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 14:24:03 +0200 Subject: [PATCH 08/20] rename add10_wg to add10 --- docs/gallery/howto/autogen/parallel_wf.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 184b804e..0989ddeb 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -127,7 +127,7 @@ def add(x, y, sleep): @task.graph_builder( inputs=[{"name": "integer"}], outputs=[{"name": "sum", "from": "sum_task.result"}] ) -def add10_wg(integer): +def add10(integer): wg = WorkGraph() code = load_code("add@localhost") # code needs to loaded in the graph builder add = wg.add_task( @@ -142,7 +142,7 @@ def add10_wg(integer): wgs = [] for i in range(2): wg = WorkGraph(f"parallel_wg{i}") - wg.add_task(add10_wg, name=f"add10_{i}", integer=i) + wg.add_task(add10, name=f"add10_{i}", integer=i) wgs.append(wg) # We use wait=False so we can continue submitting @@ -174,12 +174,12 @@ def add10_wg(integer): # ------------------- -# This graph_builder runs the add10_wg over a loop and its +# This graph_builder runs the add10 over a loop and its @task.graph_builder() def parallel_add(nb_iterations): wg = WorkGraph() for i in range(nb_iterations): - wg.add_task(add10_wg, name=f"add10_{i}", integer=i) + wg.add_task(add10, name=f"add10_{i}", integer=i) return wg From 0f20dbc1a79707c6f7972b92d850789ce81c696e Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 14:30:42 +0200 Subject: [PATCH 09/20] add further reading to aggregate notebook --- docs/gallery/howto/autogen/parallel_wf.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 0989ddeb..e70789dd 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -269,3 +269,13 @@ def parallel_add(nb_iterations): # # verdi config set daemon.worker_process_slots 200 # verdi daemon restart + + + +# %% +# Further reading +# --------------- +# Now you learned how to run tasks in parallel you might want to know how to +# aggregate the results of all these parallel tasks (e.g. taking the mean of +# all computed values). For this you can further read +# :ref:`sphx_glr_howto_autogen_aggregate.py`. From 38e595118c7d2fd33603a142c476ffa6aaa6b16e Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 14:31:50 +0200 Subject: [PATCH 10/20] pre-commit run --- docs/gallery/howto/autogen/parallel_wf.py | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index e70789dd..f7f3127a 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -271,7 +271,6 @@ def parallel_add(nb_iterations): # verdi daemon restart - # %% # Further reading # --------------- From 46faf073dc5e16cc15188671ab510e233c1d086b Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 18:09:21 +0200 Subject: [PATCH 11/20] reduce time by reusing resuts from graphp builder example --- docs/gallery/howto/autogen/parallel_wf.py | 82 +++++++++++------------ 1 file changed, 39 insertions(+), 43 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index f7f3127a..8af3f0b0 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -140,9 +140,10 @@ def add10(integer): # %% wgs = [] +tasks = [] for i in range(2): wg = WorkGraph(f"parallel_wg{i}") - wg.add_task(add10, name=f"add10_{i}", integer=i) + tasks.append(wg.add_task(add10, name="add10", integer=i)) wgs.append(wg) # We use wait=False so we can continue submitting @@ -152,21 +153,22 @@ def add10(integer): wgs[0].wait() # %% -# We print the difference between the mtime (the time the WorkGraph has been last time changed) and the ctime (the time of creation). Since the WorkGraph's status is changed when finished, this give us a good estimate of the running time. +# We print the difference between the mtime (the time the WorkGraph has been +# last time changed) and the ctime (the time of creation). Since the +# WorkGraph's status is changed when finished, this give us a good estimate of +# the running time. print( - "WG0 created:", - load_node(wgs[0].pk).ctime.time(), + "add10 task of WG0 created:", + load_node(tasks[0].pk).ctime.time(), "finished:", - load_node(wgs[0].pk).mtime.time(), + load_node(tasks[0].pk).mtime.time(), ) -print("Time WG0", load_node(wgs[0].pk).mtime - load_node(wgs[0].pk).ctime) print( - "WG1 created:", - load_node(wgs[1].pk).ctime.time(), + "add10 task of WG1 created:", + load_node(tasks[1].pk).ctime.time(), "finished:", - load_node(wgs[1].pk).mtime.time(), + load_node(tasks[1].pk).mtime.time(), ) -print("Time WG1", load_node(wgs[1].pk).mtime - load_node(wgs[1].pk).ctime) # %% @@ -185,21 +187,37 @@ def parallel_add(nb_iterations): # Submitting a parallel that adds 10 two times to different numbers wg = WorkGraph(f"parallel_graph_builder") -add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) +parallel_add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) wg.to_html() # %% wg.submit(wait=True) - # %% -# We look at the times of creation and last change -print("Time for running with graph builder", add_task.mtime - add_task.ctime) +parallel_add_wg = WorkGraph.load(parallel_add_task.pk) +add10_0_task = parallel_add_wg.tasks["add10_0"] +add10_1_task = parallel_add_wg.tasks["add10_1"] +print( + "add10_0 task created:", + add10_0_task.ctime.time(), + "finished:", + add10_0_task.mtime.time(), +) +print( + "add10_1 task created:", + add10_1_task.ctime.time(), + "finished:", + add10_1_task.mtime.time(), +) # %% # We can see that the time is less than 6 seconds which means that the two additions # were performed in parallel +# %% +# We can also look at the total time and see the overhead costs. +print("Time for running parallelized graph builder", add_task.mtime - add_task.ctime) + # %% # Increasing number of daemon workers # ----------------------------------- @@ -211,49 +229,27 @@ def parallel_add(nb_iterations): from aiida.engine.daemon.client import get_daemon_client client = get_daemon_client() +print(f"Number of current daemon workers {client.get_numprocesses()}") # %% -# We rerun the last graph builder for 3 iterations - -wg = WorkGraph("wg_daemon_worker_1") -wg.add_task(parallel_add, name="parallel_add", nb_iterations=3) -wg.to_html() - -# %% -wg.submit(wait=True) -print( - f"Time for running with {client.get_numprocesses()['numprocesses']} worker", - load_node(wg.pk).mtime - load_node(wg.pk).ctime, -) - -# %% -# We increase the number of workers by one. One can also do this in the workgraph GUI. +# We rerun the last graph builder with 2 damon workers -client = get_daemon_client() client.increase_workers(1) - -# %% -# Now we submit again and the time have shortens a bit. - wg = WorkGraph("wg_daemon_worker_2") -wg.add_task(parallel_add, name="parallel_add", nb_iterations=3) +parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) wg.to_html() # %% - wg.submit(wait=True) print( - f"Time for running with {client.get_numprocesses()['numprocesses']} worker", - load_node(wg.pk).mtime - load_node(wg.pk).ctime, + "Time for running parallelized graph builder with 2 daemons", + parallel_add_task_2.mtime - parallel_add_task_2.ctime, ) # %% -# Note that on readthedocs you will not see a big difference due to the hardware. -# With a limited number of CPU the workers cannot be parallelized - -import multiprocessing +# The overhead time has shortens a bit as the handling of the CalcJobs and +# WorkGraphs could be parallelized. -print("Number of CPUs", multiprocessing.cpu_count()) # %% # Reset back to one worker From e7fe03ab20bda925f2c9ec3028d16b5b2fb1d2ec Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 20:28:29 +0200 Subject: [PATCH 12/20] fix typo --- docs/gallery/howto/autogen/parallel_wf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 8af3f0b0..cac86c23 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -216,7 +216,7 @@ def parallel_add(nb_iterations): # %% # We can also look at the total time and see the overhead costs. -print("Time for running parallelized graph builder", add_task.mtime - add_task.ctime) +print("Time for running parallelized graph builder", parallel_add_task.mtime - parallel_add_task.ctime) # %% # Increasing number of daemon workers From 019ee508f8654f6b47c8e65a44f048e87d61118a Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 20:52:21 +0200 Subject: [PATCH 13/20] Apply suggestions from code review --- docs/gallery/howto/autogen/parallel_wf.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index cac86c23..6886d7b3 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -235,6 +235,7 @@ def parallel_add(nb_iterations): # We rerun the last graph builder with 2 damon workers client.increase_workers(1) +print(f"Number of current daemon workers {client.get_numprocesses()["numprocesses"]}") wg = WorkGraph("wg_daemon_worker_2") parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) wg.to_html() @@ -248,7 +249,8 @@ def parallel_add(nb_iterations): # %% # The overhead time has shortens a bit as the handling of the CalcJobs and -# WorkGraphs could be parallelized. +# WorkGraphs could be parallelized. One can increase the number of iterations +# to see a more significant difference. # %% From d350b39ad1c4ef9bc751fda50768257d316edf83 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 11 Sep 2024 20:52:44 +0200 Subject: [PATCH 14/20] Apply suggestions from code review --- docs/gallery/howto/autogen/parallel_wf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 6886d7b3..13790e62 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -229,7 +229,7 @@ def parallel_add(nb_iterations): from aiida.engine.daemon.client import get_daemon_client client = get_daemon_client() -print(f"Number of current daemon workers {client.get_numprocesses()}") +print(f"Number of current daemon workers {client.get_numprocesses()["numprocesses"]}") # %% # We rerun the last graph builder with 2 damon workers From 77a35a57ead545ba62185f59edbb531858f77f22 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Thu, 19 Sep 2024 10:55:45 +0200 Subject: [PATCH 15/20] fix pre-commit and f string --- docs/gallery/howto/autogen/parallel_wf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 13790e62..87b942f5 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -229,13 +229,13 @@ def parallel_add(nb_iterations): from aiida.engine.daemon.client import get_daemon_client client = get_daemon_client() -print(f"Number of current daemon workers {client.get_numprocesses()["numprocesses"]}") +print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}") # %% # We rerun the last graph builder with 2 damon workers client.increase_workers(1) -print(f"Number of current daemon workers {client.get_numprocesses()["numprocesses"]}") +print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}") wg = WorkGraph("wg_daemon_worker_2") parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) wg.to_html() From 174d9ea7a3a915965a8c37a33fd7a86d9ec7c6c6 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Thu, 19 Sep 2024 11:17:47 +0200 Subject: [PATCH 16/20] pre-commit run --- docs/gallery/howto/autogen/parallel_wf.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 87b942f5..6946f5a2 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -216,7 +216,10 @@ def parallel_add(nb_iterations): # %% # We can also look at the total time and see the overhead costs. -print("Time for running parallelized graph builder", parallel_add_task.mtime - parallel_add_task.ctime) +print( + "Time for running parallelized graph builder", + parallel_add_task.mtime - parallel_add_task.ctime, +) # %% # Increasing number of daemon workers From d7089cce99ee01add825b9db1eed1e1a817d502d Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Thu, 19 Sep 2024 11:27:36 +0200 Subject: [PATCH 17/20] update reference --- docs/gallery/howto/autogen/parallel_wf.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel_wf.py index 6946f5a2..62544118 100644 --- a/docs/gallery/howto/autogen/parallel_wf.py +++ b/docs/gallery/howto/autogen/parallel_wf.py @@ -277,5 +277,4 @@ def parallel_add(nb_iterations): # --------------- # Now you learned how to run tasks in parallel you might want to know how to # aggregate the results of all these parallel tasks (e.g. taking the mean of -# all computed values). For this you can further read -# :ref:`sphx_glr_howto_autogen_aggregate.py`. +# all computed values). For this you can further read `how to aggregate outputs `_` From f18e3379fce59ca047b6b1ea217f01b18fe8ee3c Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Thu, 19 Sep 2024 11:47:22 +0200 Subject: [PATCH 18/20] move parallel_wf parallel_wf --- docs/gallery/howto/autogen/{parallel_wf.py => parallel.py} | 0 docs/source/howto/index.rst | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename docs/gallery/howto/autogen/{parallel_wf.py => parallel.py} (100%) diff --git a/docs/gallery/howto/autogen/parallel_wf.py b/docs/gallery/howto/autogen/parallel.py similarity index 100% rename from docs/gallery/howto/autogen/parallel_wf.py rename to docs/gallery/howto/autogen/parallel.py diff --git a/docs/source/howto/index.rst b/docs/source/howto/index.rst index 6d33bfb2..274f74be 100644 --- a/docs/source/howto/index.rst +++ b/docs/source/howto/index.rst @@ -9,7 +9,7 @@ This section contains a collection of HowTos for various topics. :caption: Contents: autogen/graph_builder - autogen/parallel_wf + autogen/parallel if while context From 286966cca4092a36e17265c5747be08c753b74cf Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Thu, 19 Sep 2024 17:16:24 +0200 Subject: [PATCH 19/20] Update docs/gallery/howto/autogen/parallel.py --- docs/gallery/howto/autogen/parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/gallery/howto/autogen/parallel.py b/docs/gallery/howto/autogen/parallel.py index 62544118..3ae68b3c 100644 --- a/docs/gallery/howto/autogen/parallel.py +++ b/docs/gallery/howto/autogen/parallel.py @@ -277,4 +277,4 @@ def parallel_add(nb_iterations): # --------------- # Now you learned how to run tasks in parallel you might want to know how to # aggregate the results of all these parallel tasks (e.g. taking the mean of -# all computed values). For this you can further read `how to aggregate outputs `_` +# all computed values). For this you can further read `how to aggregate outputs `_ From d28bb991c9303660b31d8e6440ed8552e6d99677 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Fri, 20 Sep 2024 16:08:10 +0200 Subject: [PATCH 20/20] run 10 times --- docs/gallery/howto/autogen/parallel.py | 41 ++++++++++++++++++-------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/docs/gallery/howto/autogen/parallel.py b/docs/gallery/howto/autogen/parallel.py index 3ae68b3c..61f46c20 100644 --- a/docs/gallery/howto/autogen/parallel.py +++ b/docs/gallery/howto/autogen/parallel.py @@ -214,13 +214,6 @@ def parallel_add(nb_iterations): # We can see that the time is less than 6 seconds which means that the two additions # were performed in parallel -# %% -# We can also look at the total time and see the overhead costs. -print( - "Time for running parallelized graph builder", - parallel_add_task.mtime - parallel_add_task.ctime, -) - # %% # Increasing number of daemon workers # ----------------------------------- @@ -231,29 +224,49 @@ def parallel_add(nb_iterations): from aiida.engine.daemon.client import get_daemon_client +# %% +# We run the 10 iterations with one daemon + client = get_daemon_client() print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}") +wg = WorkGraph("wg_daemon_worker_2") +parallel_add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=10) +wg.to_html() + +# %% + +wg.submit(wait=True) # %% -# We rerun the last graph builder with 2 damon workers +# And look at the total time and see the overhead costs. + +print( + "Time for running parallelized graph builder", + parallel_add_task.mtime - parallel_add_task.ctime, +) + + +# %% +# We rerun it now with 2 damon workers client.increase_workers(1) print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}") wg = WorkGraph("wg_daemon_worker_2") -parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) +parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=10) wg.to_html() # %% wg.submit(wait=True) + +# %% print( "Time for running parallelized graph builder with 2 daemons", parallel_add_task_2.mtime - parallel_add_task_2.ctime, ) # %% -# The overhead time has shortens a bit as the handling of the CalcJobs and -# WorkGraphs could be parallelized. One can increase the number of iterations -# to see a more significant difference. +# The time has not change as the handling of the CalcJobs. If one can increase +# the number of iterations to see a more significant difference. # %% @@ -270,7 +283,9 @@ def parallel_add(nb_iterations): # # verdi config set daemon.worker_process_slots 200 # verdi daemon restart - +# +# For more information about improving the performance please refer to the +# `"Tuning performance" section in the official AiiDA documentation `_ # %% # Further reading