From e99a01bf2525b8fb7cdce7b10e17ab6700c06d16 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 30 May 2024 09:25:30 +0200 Subject: [PATCH 1/9] mpi4py as optional dependency --- .ci_support/environment-mini.yml | 8 ++++++ .github/workflows/minimal.yml | 32 +++++++++++++++++++++ pympipool/__init__.py | 4 +-- pympipool/scheduler/__init__.py | 4 +-- pympipool/shared/executorbase.py | 10 ++++++- pympipool/shared/inputcheck.py | 6 ++-- pyproject.toml | 6 ++-- tests/benchmark/llh.py | 4 +-- tests/test_dependencies_executor.py | 2 +- tests/test_executor_backend_mpi.py | 17 +++++++---- tests/test_executor_backend_mpi_noblock.py | 20 ++++--------- tests/test_mpi_executor.py | 11 ++++++++ tests/test_shared_communication.py | 33 +++++++++++++++++++++- 13 files changed, 124 insertions(+), 33 deletions(-) create mode 100644 .ci_support/environment-mini.yml create mode 100644 .github/workflows/minimal.yml diff --git a/.ci_support/environment-mini.yml b/.ci_support/environment-mini.yml new file mode 100644 index 00000000..6a377c83 --- /dev/null +++ b/.ci_support/environment-mini.yml @@ -0,0 +1,8 @@ +channels: +- conda-forge +dependencies: +- python +- numpy +- cloudpickle =3.0.0 +- tqdm =4.66.4 +- pyzmq =26.0.3 diff --git a/.github/workflows/minimal.yml b/.github/workflows/minimal.yml new file mode 100644 index 00000000..11de29d6 --- /dev/null +++ b/.github/workflows/minimal.yml @@ -0,0 +1,32 @@ +# This workflow is used to run the unittest of pyiron + +name: Unittests-minimal + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: conda-incubator/setup-miniconda@v2.2.0 + with: + python-version: "3.12" + mamba-version: "*" + channels: conda-forge + miniforge-variant: Mambaforge + channel-priority: strict + auto-update-conda: true + environment-file: .ci_support/environment-mini.yml + - name: Test + shell: bash -l {0} + timeout-minutes: 5 + run: | + pip install versioneer[toml]==0.29 + pip install . --no-deps --no-build-isolation + cd tests + python -m unittest discover . diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 5903a06f..a9618430 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -40,7 +40,7 @@ class Executor: points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true - backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto" + backend (str): Switch between the different backends "flux", "local" or "slurm". Alternatively, when "auto" is selected (the default) the available backend is determined automatically. block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource requirements, pympipool supports block allocation. In this case all resources have @@ -138,7 +138,7 @@ def __new__( points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true - backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto" + backend (str): Switch between the different backends "flux", "local" or "slurm". Alternatively, when "auto" is selected (the default) the available backend is determined automatically. block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource requirements, pympipool supports block allocation. In this case all diff --git a/pympipool/scheduler/__init__.py b/pympipool/scheduler/__init__.py index 793c603a..e4f3d574 100644 --- a/pympipool/scheduler/__init__.py +++ b/pympipool/scheduler/__init__.py @@ -77,7 +77,7 @@ def create_executor( any computer should be able to resolve that their own hostname points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true - backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto" + backend (str): Switch between the different backends "flux", "local" or "slurm". Alternatively, when "auto" is selected (the default) the available backend is determined automatically. block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource requirements, pympipool supports block allocation. In this case all @@ -145,7 +145,7 @@ def create_executor( cwd=cwd, hostname_localhost=hostname_localhost, ) - else: # backend="mpi" + else: # backend="local" check_threads_per_core(threads_per_core=threads_per_core) check_gpus_per_worker(gpus_per_worker=gpus_per_worker) check_command_line_argument_lst( diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index d95916c5..022c0162 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -19,6 +19,12 @@ check_resource_dict_is_empty, ) +try: + import mpi4py + mpi4py_available = True +except ImportError: + mpi4py_available = False + class ExecutorBase(FutureExecutor): def __init__(self): @@ -422,8 +428,10 @@ def _get_backend_path(cores: int): list[str]: List of strings containing the python executable path and the backend script to execute """ command_lst = [sys.executable] - if cores > 1: + if cores > 1 and mpi4py_available: command_lst += [_get_command_path(executable="mpiexec.py")] + elif cores > 1 and not mpi4py_available: + raise ImportError("mpi4py is required for parallel calculations. Please install mpi4py.") else: command_lst += [_get_command_path(executable="serial.py")] return command_lst diff --git a/pympipool/shared/inputcheck.py b/pympipool/shared/inputcheck.py index 82ee5480..c19c076f 100644 --- a/pympipool/shared/inputcheck.py +++ b/pympipool/shared/inputcheck.py @@ -70,9 +70,9 @@ def check_refresh_rate(refresh_rate: float): def validate_backend( backend: str, flux_installed: bool = False, slurm_installed: bool = False ) -> str: - if backend not in ["auto", "mpi", "slurm", "flux"]: + if backend not in ["auto", "flux", "local", "slurm"]: raise ValueError( - 'The currently implemented backends are ["flux", "mpi", "slurm"]. ' + 'The currently implemented backends are ["auto", "flux", "local", "slurm"]. ' 'Alternatively, you can select "auto", the default option, to automatically determine the backend. But ' + backend + " is not a valid choice." @@ -90,7 +90,7 @@ def validate_backend( elif backend == "slurm" or (backend == "auto" and slurm_installed): return "slurm" else: - return "mpi" + return "local" def check_pmi(backend: str, pmi: Optional[str]): diff --git a/pyproject.toml b/pyproject.toml index f0251a9c..6a3a3d57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["cloudpickle", "mpi4py", "pyzmq", "setuptools", "tqdm", "versioneer[toml]==0.29"] +requires = ["cloudpickle", "pyzmq", "setuptools", "tqdm", "versioneer[toml]==0.29"] build-backend = "setuptools.build_meta" [project] @@ -25,7 +25,6 @@ classifiers = [ ] dependencies = [ "cloudpickle==3.0.0", - "mpi4py==3.1.6", "pyzmq==26.0.3", "tqdm==4.66.4", ] @@ -36,6 +35,9 @@ Homepage = "https://github.com/pyiron/pympipool" Documentation = "https://pympipool.readthedocs.io" Repository = "https://github.com/pyiron/pympipool" +[project.optional-dependencies] +mpi = ["mpi4py==3.1.6"] + [tool.setuptools.packages.find] include = ["pympipool*"] diff --git a/tests/benchmark/llh.py b/tests/benchmark/llh.py index d421f948..51cc46dc 100644 --- a/tests/benchmark/llh.py +++ b/tests/benchmark/llh.py @@ -50,7 +50,7 @@ def run_static(mean=0.1, sigma=1.1, runs=32): sigma=1.1, runs=32, max_cores=4, - backend="mpi", + backend="local", block_allocation=True, ) elif run_mode == "pympipool": @@ -62,7 +62,7 @@ def run_static(mean=0.1, sigma=1.1, runs=32): sigma=1.1, runs=32, max_cores=4, - backend="mpi", + backend="local", block_allocation=False, ) elif run_mode == "flux": diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index de35d333..0d277fb6 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -12,7 +12,7 @@ def add_function(parameter_1, parameter_2): class TestExecutorWithDependencies(unittest.TestCase): def test_executor(self): - with Executor(max_cores=1, backend="mpi", hostname_localhost=True) as exe: + with Executor(max_cores=1, backend="local", hostname_localhost=True) as exe: cloudpickle_register(ind=1) future_1 = exe.submit(add_function, 1, parameter_2=2) future_2 = exe.submit(add_function, 1, parameter_2=future_1) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 2cb3b876..d262b658 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -3,6 +3,12 @@ from pympipool import Executor from pympipool.shared.executorbase import cloudpickle_register +try: + import mpi4py + mpi4py_installed = True +except ImportError: + mpi4py_installed = False + def calc(i): return i @@ -19,7 +25,7 @@ def mpi_funct(i): class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): with Executor( - max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=True + max_cores=2, hostname_localhost=True, backend="local", block_allocation=True ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -31,7 +37,7 @@ def test_meta_executor_serial(self): def test_meta_executor_single(self): with Executor( - max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True + max_cores=1, hostname_localhost=True, backend="local", block_allocation=True ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -41,12 +47,13 @@ def test_meta_executor_single(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + @unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") def test_meta_executor_parallel(self): with Executor( max_workers=2, cores_per_worker=2, hostname_localhost=True, - backend="mpi", + backend="local", block_allocation=True, ) as exe: cloudpickle_register(ind=1) @@ -61,7 +68,7 @@ def test_errors(self): cores_per_worker=1, threads_per_core=2, hostname_localhost=True, - backend="mpi", + backend="local", ) with self.assertRaises(TypeError): Executor( @@ -69,5 +76,5 @@ def test_errors(self): cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, - backend="mpi", + backend="local", ) diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index afc7fabe..c1b0130f 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -8,14 +8,6 @@ def calc(i): return i -def mpi_funct(i): - from mpi4py import MPI - - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return i, size, rank - - def resource_dict(resource_dict): return resource_dict @@ -23,7 +15,7 @@ def resource_dict(resource_dict): class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): with Executor( - max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=False + max_cores=2, hostname_localhost=True, backend="local", block_allocation=False ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -35,7 +27,7 @@ def test_meta_executor_serial(self): def test_meta_executor_single(self): with Executor( - max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=False + max_cores=1, hostname_localhost=True, backend="local", block_allocation=False ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -52,7 +44,7 @@ def test_errors(self): cores_per_worker=1, threads_per_core=2, hostname_localhost=True, - backend="mpi", + backend="local", ) with self.assertRaises(TypeError): Executor( @@ -60,13 +52,13 @@ def test_errors(self): cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, - backend="mpi", + backend="local", ) with self.assertRaises(ValueError): with Executor( max_cores=1, hostname_localhost=True, - backend="mpi", + backend="local", block_allocation=False, ) as exe: exe.submit(resource_dict, resource_dict={}) @@ -74,7 +66,7 @@ def test_errors(self): with Executor( max_cores=1, hostname_localhost=True, - backend="mpi", + backend="local", block_allocation=True, ) as exe: exe.submit(resource_dict, resource_dict={}) diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 35b26431..aa1e99f5 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -13,6 +13,12 @@ ExecutorBase, ) +try: + import mpi4py + mpi4py_installed = True +except ImportError: + mpi4py_installed = False + def calc(i): return i @@ -127,6 +133,7 @@ def test_pympiexecutor_errors(self): ) +@unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): with PyMPIExecutor( @@ -164,6 +171,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): self.assertEqual(output, [2, 2]) +@unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") class TestPyMpiStepExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): with PyMPIStepExecutor( @@ -201,6 +209,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): self.assertEqual(output, [2, 2]) + class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): with PyMPIExecutor( @@ -339,6 +348,7 @@ def test_meta_step(self): else: self.assertEqual(str(exe.info[k]), v) + @unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") def test_pool_multi_core(self): with PyMPIExecutor( max_workers=1, cores_per_worker=2, hostname_localhost=True @@ -352,6 +362,7 @@ def test_pool_multi_core(self): self.assertEqual(len(p), 0) self.assertEqual(output.result(), [(2, 2, 0), (2, 2, 1)]) + @unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") def test_pool_multi_core_map(self): with PyMPIExecutor( max_workers=1, cores_per_worker=2, hostname_localhost=True diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 940b4ee9..cdecedee 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -16,12 +16,20 @@ from pympipool.shared.interface import MpiExecInterface +try: + import mpi4py + mpi4py_installed = True +except ImportError: + mpi4py_installed = False + + def calc(i): return np.array(i**2) class TestInterface(unittest.TestCase): - def test_interface(self): + @unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") + def test_interface_mpi(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} interface = SocketInterface( @@ -44,6 +52,29 @@ def test_interface(self): ) interface.shutdown(wait=True) + def test_interface_serial(self): + cloudpickle_register(ind=1) + task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} + interface = SocketInterface( + interface=MpiExecInterface(cwd=None, cores=1, oversubscribe=False) + ) + interface.bootup( + command_lst=[ + sys.executable, + os.path.abspath( + os.path.join( + __file__, "..", "..", "pympipool", "backend", "serial.py" + ) + ), + "--zmqport", + str(interface.bind_to_random_port()), + ] + ) + self.assertEqual( + interface.send_and_receive_dict(input_dict=task_dict), np.array(4) + ) + interface.shutdown(wait=True) + class TestZMQ(unittest.TestCase): def test_initialize_zmq(self): From 4749fa601af6883b26096f0842863cbb103b2df4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 30 May 2024 07:25:51 +0000 Subject: [PATCH 2/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pympipool/shared/executorbase.py | 5 ++++- tests/test_executor_backend_mpi.py | 5 ++++- tests/test_executor_backend_mpi_noblock.py | 10 ++++++++-- tests/test_mpi_executor.py | 18 +++++++++++++----- tests/test_shared_communication.py | 5 ++++- 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 022c0162..81058729 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -21,6 +21,7 @@ try: import mpi4py + mpi4py_available = True except ImportError: mpi4py_available = False @@ -431,7 +432,9 @@ def _get_backend_path(cores: int): if cores > 1 and mpi4py_available: command_lst += [_get_command_path(executable="mpiexec.py")] elif cores > 1 and not mpi4py_available: - raise ImportError("mpi4py is required for parallel calculations. Please install mpi4py.") + raise ImportError( + "mpi4py is required for parallel calculations. Please install mpi4py." + ) else: command_lst += [_get_command_path(executable="serial.py")] return command_lst diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index d262b658..c9825201 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -5,6 +5,7 @@ try: import mpi4py + mpi4py_installed = True except ImportError: mpi4py_installed = False @@ -47,7 +48,9 @@ def test_meta_executor_single(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - @unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") + @unittest.skipIf( + mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + ) def test_meta_executor_parallel(self): with Executor( max_workers=2, diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index c1b0130f..7fcd6af2 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -15,7 +15,10 @@ def resource_dict(resource_dict): class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): with Executor( - max_cores=2, hostname_localhost=True, backend="local", block_allocation=False + max_cores=2, + hostname_localhost=True, + backend="local", + block_allocation=False, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -27,7 +30,10 @@ def test_meta_executor_serial(self): def test_meta_executor_single(self): with Executor( - max_cores=1, hostname_localhost=True, backend="local", block_allocation=False + max_cores=1, + hostname_localhost=True, + backend="local", + block_allocation=False, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index aa1e99f5..36c6c220 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -15,6 +15,7 @@ try: import mpi4py + mpi4py_installed = True except ImportError: mpi4py_installed = False @@ -133,7 +134,9 @@ def test_pympiexecutor_errors(self): ) -@unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") +@unittest.skipIf( + mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." +) class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): with PyMPIExecutor( @@ -171,7 +174,9 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): self.assertEqual(output, [2, 2]) -@unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") +@unittest.skipIf( + mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." +) class TestPyMpiStepExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): with PyMPIStepExecutor( @@ -209,7 +214,6 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): self.assertEqual(output, [2, 2]) - class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): with PyMPIExecutor( @@ -348,7 +352,9 @@ def test_meta_step(self): else: self.assertEqual(str(exe.info[k]), v) - @unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") + @unittest.skipIf( + mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + ) def test_pool_multi_core(self): with PyMPIExecutor( max_workers=1, cores_per_worker=2, hostname_localhost=True @@ -362,7 +368,9 @@ def test_pool_multi_core(self): self.assertEqual(len(p), 0) self.assertEqual(output.result(), [(2, 2, 0), (2, 2, 1)]) - @unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") + @unittest.skipIf( + mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + ) def test_pool_multi_core_map(self): with PyMPIExecutor( max_workers=1, cores_per_worker=2, hostname_localhost=True diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index cdecedee..3ef20830 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -18,6 +18,7 @@ try: import mpi4py + mpi4py_installed = True except ImportError: mpi4py_installed = False @@ -28,7 +29,9 @@ def calc(i): class TestInterface(unittest.TestCase): - @unittest.skipIf(mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped.") + @unittest.skipIf( + mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + ) def test_interface_mpi(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} From f92bff281e02f3dd0614921f9e0db0e146c32e43 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 30 May 2024 09:29:26 +0200 Subject: [PATCH 3/9] fix import check --- pympipool/shared/executorbase.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 022c0162..592c46ed 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -2,6 +2,7 @@ Executor as FutureExecutor, Future, ) +import importlib.util import inspect import os import queue @@ -19,12 +20,6 @@ check_resource_dict_is_empty, ) -try: - import mpi4py - mpi4py_available = True -except ImportError: - mpi4py_available = False - class ExecutorBase(FutureExecutor): def __init__(self): @@ -428,9 +423,9 @@ def _get_backend_path(cores: int): list[str]: List of strings containing the python executable path and the backend script to execute """ command_lst = [sys.executable] - if cores > 1 and mpi4py_available: + if cores > 1 and importlib.util.find_spec("mpi4py") is not None: command_lst += [_get_command_path(executable="mpiexec.py")] - elif cores > 1 and not mpi4py_available: + elif cores > 1: raise ImportError("mpi4py is required for parallel calculations. Please install mpi4py.") else: command_lst += [_get_command_path(executable="serial.py")] From 2675cedd62fb3db85b2548904cba4b9952e525b7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 30 May 2024 07:31:22 +0000 Subject: [PATCH 4/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pympipool/shared/executorbase.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 592c46ed..f45dea88 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -426,7 +426,9 @@ def _get_backend_path(cores: int): if cores > 1 and importlib.util.find_spec("mpi4py") is not None: command_lst += [_get_command_path(executable="mpiexec.py")] elif cores > 1: - raise ImportError("mpi4py is required for parallel calculations. Please install mpi4py.") + raise ImportError( + "mpi4py is required for parallel calculations. Please install mpi4py." + ) else: command_lst += [_get_command_path(executable="serial.py")] return command_lst From 430f4f64a3bf904f0fa72163969589acc4659f7e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 30 May 2024 09:34:54 +0200 Subject: [PATCH 5/9] update tests --- tests/test_executor_backend_mpi.py | 7 ++----- tests/test_mpi_executor.py | 7 ++----- tests/test_shared_communication.py | 8 ++------ 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index c9825201..3e05ed32 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -1,14 +1,11 @@ +import importlib.util import unittest from pympipool import Executor from pympipool.shared.executorbase import cloudpickle_register -try: - import mpi4py - mpi4py_installed = True -except ImportError: - mpi4py_installed = False +mpi4py_installed = importlib.util.find_spec('mpi4py') is not None def calc(i): diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 36c6c220..545a658f 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -1,4 +1,5 @@ from concurrent.futures import CancelledError, Future +import importlib.util from queue import Queue from time import sleep import unittest @@ -13,12 +14,8 @@ ExecutorBase, ) -try: - import mpi4py - mpi4py_installed = True -except ImportError: - mpi4py_installed = False +mpi4py_installed = importlib.util.find_spec('mpi4py') is not None def calc(i): diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 3ef20830..2b9614b5 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -1,3 +1,4 @@ +import importlib.util import os import sys import unittest @@ -16,12 +17,7 @@ from pympipool.shared.interface import MpiExecInterface -try: - import mpi4py - - mpi4py_installed = True -except ImportError: - mpi4py_installed = False +mpi4py_installed = importlib.util.find_spec('mpi4py') is not None def calc(i): From 4e9478ab1edd68c02b214ca36d358886a832e024 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 30 May 2024 07:35:08 +0000 Subject: [PATCH 6/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_executor_backend_mpi.py | 2 +- tests/test_mpi_executor.py | 2 +- tests/test_shared_communication.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 3e05ed32..4c8a126c 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -5,7 +5,7 @@ from pympipool.shared.executorbase import cloudpickle_register -mpi4py_installed = importlib.util.find_spec('mpi4py') is not None +mpi4py_installed = importlib.util.find_spec("mpi4py") is not None def calc(i): diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 545a658f..796d5182 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -15,7 +15,7 @@ ) -mpi4py_installed = importlib.util.find_spec('mpi4py') is not None +mpi4py_installed = importlib.util.find_spec("mpi4py") is not None def calc(i): diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 2b9614b5..1fa0db63 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -17,7 +17,7 @@ from pympipool.shared.interface import MpiExecInterface -mpi4py_installed = importlib.util.find_spec('mpi4py') is not None +mpi4py_installed = importlib.util.find_spec("mpi4py") is not None def calc(i): From 7e0f84d1499488879753f3037b00667f9faf2cb4 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 30 May 2024 09:40:27 +0200 Subject: [PATCH 7/9] update notebook --- notebooks/examples.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notebooks/examples.ipynb b/notebooks/examples.ipynb index 1e9f4c7c..5c46f655 100644 --- a/notebooks/examples.ipynb +++ b/notebooks/examples.ipynb @@ -1 +1 @@ -{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.12.3","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"cell_type":"markdown","source":"# Examples\nThe `pympipool.Executor` extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nto simplify the up-scaling of individual functions in a given workflow.","metadata":{},"id":"c31c95fe-9af4-42fd-be2c-713afa380e09"},{"cell_type":"markdown","source":"## Compatibility\nStarting with the basic example of `1+1=2`. With the `ThreadPoolExecutor` from the [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nstandard library this can be written as: ","metadata":{},"id":"a1c6370e-7c8a-4da2-ac7d-42a36e12b27c"},{"cell_type":"code","source":"from concurrent.futures import ThreadPoolExecutor\n\nwith ThreadPoolExecutor(max_workers=1) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"execution_count":1,"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"id":"8b663009-60af-4d71-8ef3-2e9c6cd79cce"},{"cell_type":"markdown","source":"In this case `max_workers=1` limits the number of threads used by the `ThreadPoolExecutor` to one. Then the `sum()`\nfunction is submitted to the executor with a list with two ones `[1, 1]` as input. A [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nobject is returned. The `Future` object allows to check the status of the execution with the `done()` method which \nreturns `True` or `False` depending on the state of the execution. Or the main process can wait until the execution is \ncompleted by calling `result()`. \n\nThe result of the calculation is `1+1=2`. ","metadata":{},"id":"56192fa7-bbd6-43fe-8598-ff764addfbac"},{"cell_type":"markdown","source":"The `pympipool.Executor` class extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) \nclass by providing more parameters to specify the level of parallelism. In addition, to specifying the maximum number \nof workers `max_workers` the user can also specify the number of cores per worker `cores_per_worker` for MPI based \nparallelism, the number of threads per core `threads_per_core` for thread based parallelism and the number of GPUs per\nworker `gpus_per_worker`. Finally, for those backends which support over-subscribing this can also be enabled using the \n`oversubscribe` parameter. All these parameters are optional, so the `pympipool.Executor` can be used as a drop-in \nreplacement for the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures).\n\nThe previous example is rewritten for the `pympipool.Executor` in:","metadata":{},"id":"99aba5f3-5667-450c-b31f-2b53918b1896"},{"cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=1, executor=flux_exe) as exe:\n future = exe.submit(sum, [1,1])\n print(future.result())","metadata":{"trusted":true},"execution_count":2,"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"id":"559f59cf-f074-4399-846d-a5706797ff64"},{"cell_type":"markdown","source":"The result of the calculation is again `1+1=2`.","metadata":{},"id":"cbe445ae-9f52-4449-a936-a4ca1acc4500"},{"cell_type":"markdown","source":"Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined \nfunctions. In the next example a custom summation function is defined:","metadata":{},"id":"eb838571-24c6-4516-ab13-66f5943325b9"},{"cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(*args):\n return sum(*args)\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n fs_1 = exe.submit(calc, [2, 1])\n fs_2 = exe.submit(calc, [2, 2])\n fs_3 = exe.submit(calc, [2, 3])\n fs_4 = exe.submit(calc, [2, 4])\n print([\n fs_1.result(), \n fs_2.result(), \n fs_3.result(), \n fs_4.result(),\n ])\n","metadata":{"trusted":true},"execution_count":3,"outputs":[{"name":"stdout","text":"[3, 4, 5, 6]\n","output_type":"stream"}],"id":"e80ca2d6-4308-4e39-bec7-b55cfb024e79"},{"cell_type":"markdown","source":"In contrast to the previous example where just a single function was submitted to a single worker, in this case a total\nof four functions is submitted to a group of two workers `max_cores=2`. Consequently, the functions are executed as a\nset of two pairs.\n\nIt returns the corresponding sums as expected. The same can be achieved with the built-in [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nclasses. Still one advantage of using the `pympipool.Executor` rather than the built-in ones, is the ability to execute \nthe same commands in interactive environments like [Jupyter notebooks](https://jupyter.org). This is achieved by using \n[cloudpickle](https://github.com/cloudpipe/cloudpickle) to serialize the python function and its parameters rather than\nthe regular pickle package. ","metadata":{},"id":"4d97551b-f7c0-416b-bcc3-55392e938ee8"},{"cell_type":"markdown","source":"For backwards compatibility with the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html) \nclass the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nalso implements the `map()` function to map a series of inputs to a function. The same `map()` function is also \navailable in the `pympipool.Executor`:","metadata":{},"id":"a97edc41-1396-48a0-8fb5-98d691a69e90"},{"cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(*args):\n return sum(*args)\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))","metadata":{"trusted":true},"execution_count":4,"outputs":[{"name":"stdout","text":"[3, 4, 5, 6]\n","output_type":"stream"}],"id":"3362afef-265f-4432-88ad-e051e6318c77"},{"cell_type":"markdown","source":"The results remain the same. ","metadata":{},"id":"27af5cc1-8514-4735-8bba-b4b32444901f"},{"cell_type":"markdown","source":"## Resource Assignment\nBy default, every submission of a python function results in a flux job (or SLURM job step) depending on the backend. \nThis is sufficient for function calls which take several minutes or longer to execute. For python functions with shorter \nrun-time `pympipool` provides block allocation (enabled by the `block_allocation=True` parameter) to execute multiple \npython functions with similar resource requirements in the same flux job (or SLURM job step). \n\nThe following example illustrates the resource definition on both level. This is redundant. For block allocations the \nresources have to be configured on the **Executor level**, otherwise it can either be defined on the **Executor level**\nor on the **Submission level**. The resource defined on the **Submission level** overwrite the resources defined on the \n**Executor level**.","metadata":{},"id":"59747b38-64f8-4342-82ad-a771aaf7c4eb"},{"cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor( \n # Resource definition on the executor level\n max_cores=2, # total number of cores available to the Executor\n # Optional resource definition \n cores_per_worker=1,\n threads_per_core=1,\n gpus_per_worker=0,\n oversubscribe=False, # not available with flux\n cwd=\"/home/jovyan/notebooks\",\n executor=flux_exe,\n hostname_localhost=False, # only required on MacOS\n backend=\"flux\", # optional in case the backend is not recognized\n block_allocation=False, \n init_function=None, # only available with block_allocation=True\n command_line_argument_lst=[], # additional command line arguments for SLURM\n ) as exe:\n future_obj = exe.submit(\n calc_function, \n 1, # parameter_a\n parameter_b=2, \n # Resource definition on the submission level\n resource_dict={\n \"cores\": 1,\n \"threads_per_core\": 1,\n \"gpus_per_core\": 0, # here it is gpus_per_core rather than gpus_per_worker\n \"oversubscribe\": False, # not available with flux\n \"cwd\": \"/home/jovyan/notebooks\",\n \"executor\": flux_exe,\n \"hostname_localhost\": False, # only required on MacOS\n # \"command_line_argument_lst\": [], # additional command line arguments for SLURM\n },\n )\n print(future_obj.result())","metadata":{"trusted":true},"execution_count":5,"outputs":[{"name":"stdout","text":"3\n","output_type":"stream"}],"id":"d29280d4-c085-47b1-b7fa-602732d60832"},{"cell_type":"markdown","source":"The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other\nresource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io)\nor any GPU, then the resources can be defined on the **Executor level** as: `cores_per_worker=1`, `threads_per_core=1` \nand `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to \nenable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python\nprocess for each submission. In this case the above example can be reduced to: ","metadata":{},"id":"5c7055ad-d84d-4afc-9023-b53643c4138a"},{"cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor( \n # Resource definition on the executor level\n max_cores=2, # total number of cores available to the Executor\n block_allocation=True, # reuse python processes\n executor=flux_exe,\n ) as exe:\n future_obj = exe.submit(\n calc_function, \n 1, # parameter_a\n parameter_b=2, \n )\n print(future_obj.result())","metadata":{"trusted":true},"execution_count":6,"outputs":[{"name":"stdout","text":"3\n","output_type":"stream"}],"id":"cd8f883f-5faf-43bc-b971-354aa9dcbecb"},{"cell_type":"markdown","source":"The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task\nis executed in which folder, but for most python functions it is not required.","metadata":{},"id":"ea6a2ef1-c5bc-49c2-adb1-60f9f6cc71f3"},{"cell_type":"markdown","source":"## Data Handling\nA limitation of many parallel approaches is the overhead in communication when working with large datasets. Instead of\nreading the same dataset repetitively, the `pympipool.Executor` in block allocation mode (`block_allocation=True`) loads the dataset only once per worker and afterwards \neach function submitted to this worker has access to the dataset, as it is already loaded in memory. To achieve this\nthe user defines an initialization function `init_function` which returns a dictionary with one key per dataset. The \nkeys of the dictionary can then be used as additional input parameters in each function submitted to the `pympipool.Executor`. When block allocation is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded. \n\nThis functionality is illustrated below: ","metadata":{},"id":"d6be1cc6-f47b-4b85-a0bc-00f9ccd8e2fd"},{"cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(i, j, k):\n return i + j + k\n\ndef init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=1, init_function=init_function, executor=flux_exe, block_allocation=True) as exe:\n fs = exe.submit(calc, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"execution_count":7,"outputs":[{"name":"stdout","text":"10\n","output_type":"stream"}],"id":"050c2781-0c8c-436b-949c-580cabf5c63c"},{"cell_type":"markdown","source":"The function `calc()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only \ntwo inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the\nsecond input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the \n`pympipool.Executor` automatically checks the keys set in the `init_function()` function. In this case the returned \ndictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc()` function,\n`i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc()`\nfunction does not define the `l` parameter this one is also ignored. \n\nThe result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()`\nfunction.","metadata":{},"id":"8386b4e6-290f-4733-8c50-4312f9ba07e4"},{"cell_type":"markdown","source":"## Up-Scaling \n[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`.","metadata":{},"id":"0d623365-1b84-4c69-97ee-f6718be8ab39"},{"cell_type":"markdown","source":"### Thread-based Parallelism\nThe number of threads per core can be controlled with the `threads_per_core` parameter during the initialization of the \n`pympipool.Executor`. Unfortunately, there is no uniform way to control the number of cores a given underlying library \nuses for thread based parallelism, so it might be necessary to set certain environment variables manually: \n\n* `OMP_NUM_THREADS`: for openmp\n* `OPENBLAS_NUM_THREADS`: for openblas\n* `MKL_NUM_THREADS`: for mkl\n* `VECLIB_MAXIMUM_THREADS`: for accelerate on Mac Os X\n* `NUMEXPR_NUM_THREADS`: for numexpr\n\nAt the current stage `pympipool.Executor` does not set these parameters itself, so you have to add them in the function\nyou submit before importing the corresponding library: \n","metadata":{},"id":"33f9eee3-e327-43e4-8f15-3cf709f3975c"},{"cell_type":"code","source":"def calc(i):\n import os\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n return i","metadata":{"trusted":true},"execution_count":8,"outputs":[],"id":"a9799f38-b9b8-411e-945d-dae951151d26"},{"cell_type":"markdown","source":"Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to\nthe number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable \ndefault. Just be careful if the number of threads is not specified it is possible that all workers try to access all \ncores at the same time which can lead to poor performance. So it is typically a good idea to monitor the CPU utilization\nwith increasing number of workers. \n\nSpecific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require\na higher number of threads per core for optimal performance. \n","metadata":{},"id":"4d2af8e0-8b49-40cc-a9ed-298d6c68870c"},{"cell_type":"markdown","source":"### MPI Parallel Python Functions\nBeyond thread based parallelism, the message passing interface (MPI) is the de facto standard parallel execution in \nscientific computing and the [`mpi4py`](https://mpi4py.readthedocs.io) bindings to the MPI libraries are commonly used\nto parallelize existing workflows. The limitation of this approach is that it requires the whole code to adopt the MPI\ncommunication standards to coordinate the way how information is distributed. Just like the `pympipool.Executor` the \n[`mpi4py.futures.MPIPoolExecutor`](https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#mpipoolexecutor) \nimplements the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\ninterface. Still in this case eah python function submitted to the executor is still limited to serial execution. The\nnovel approach of the `pympipool.Executor` is mixing these two types of parallelism. Individual functions can use\nthe [`mpi4py`](https://mpi4py.readthedocs.io) library to handle the parallel execution within the context of this \nfunction while these functions can still me submitted to the `pympipool.Executor` just like any other function. The\nadvantage of this approach is that the users can parallelize their workflows one function at the time. \n\nThe example in `test_mpi.py` illustrates the submission of a simple MPI parallel python function: ","metadata":{},"id":"2faf6399-0230-4cdd-b4d2-2508dee66d47"},{"cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(i):\n from mpi4py import MPI\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe, pmi=\"pmix\") as exe:\n fs = exe.submit(calc, 3)\n print(fs.result())","metadata":{"trusted":true},"execution_count":9,"outputs":[{"name":"stdout","text":"[(3, 2, 0), (3, 2, 1)]\n","output_type":"stream"}],"id":"44e510fc-8897-46a8-bef7-f1a5c47e4fbf"},{"cell_type":"markdown","source":"In the example environment OpenMPI version 5 is used, so the `pmi` parameter has to be set to `pmix` rather than `pmi1` or `pmi2` which is the default. For `mpich` it is not necessary to specify the `pmi` interface manually.\nThe `calc()` function initializes the [`mpi4py`](https://mpi4py.readthedocs.io) library and gathers the size of the \nallocation and the rank of the current process within the MPI allocation. This function is then submitted to an \n`pympipool.Executor` which is initialized with a single worker with two cores `cores_per_worker=2`. So each function\ncall is going to have access to two cores. \n\nJust like before the script can be called with any python interpreter even though it is using the [`mpi4py`](https://mpi4py.readthedocs.io)\nlibrary in the background it is not necessary to execute the script with `mpiexec` or `mpirun`.\n\nThe response consists of a list of two tuples, one for each MPI parallel process, with the first entry of the tuple \nbeing the parameter `i=3`, followed by the number of MPI parallel processes assigned to the function call `cores_per_worker=2`\nand finally the index of the specific process `0` or `1`. ","metadata":{},"id":"4fa03544-1dfc-465a-b352-0458b710cbcd"},{"cell_type":"markdown","source":"### GPU Assignment\nWith the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular.\nConsequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the \n`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: ","metadata":{},"id":"581e948b-8c66-42fb-b4b2-279cc9e1c1f3"},{"cell_type":"markdown","source":"```\nimport socket\nimport flux.job\nfrom pympipool import Executor\nfrom tensorflow.python.client import device_lib\n\ndef get_available_gpus():\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(\n max_workers=2, \n gpus_per_worker=1,\n executor=flux_exe,\n ) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{},"id":"7d1bca64-14fb-4d40-997d-b58a011508bf"},{"cell_type":"markdown","source":"The additional parameter `gpus_per_worker=1` specifies that one GPU is assigned to each worker. This functionality \nrequires `pympipool` to be connected to a resource manager like the [SLURM workload manager](https://www.schedmd.com)\nor preferably the [flux framework](https://flux-framework.org). The rest of the script follows the previous examples, \nas two functions are submitted and the results are printed. \n\nTo clarify the execution of such an example on a high performance computing (HPC) cluster using the [SLURM workload manager](https://www.schedmd.com)\nthe submission script is given below: ","metadata":{},"id":"23794ff4-916f-4b03-a18a-c232bab68dfa"},{"cell_type":"markdown","source":"```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\npython test_gpu.py\n```","metadata":{},"id":"6dea0b84-65fd-4785-b78d-0ad3ff5aaa95"},{"cell_type":"markdown","source":"The important part is that for using the `pympipool.slurm.PySlurmExecutor` backend the script `test_gpu.py` does not \nneed to be executed with `srun` but rather it is sufficient to just execute it with the python interpreter. `pympipool`\ninternally calls `srun` to assign the individual resources to a given worker. \n\nFor the more complex setup of running the [flux framework](https://flux-framework.org) as a secondary resource scheduler\nwithin the [SLURM workload manager](https://www.schedmd.com) it is essential that the resources are passed from the \n[SLURM workload manager](https://www.schedmd.com) to the [flux framework](https://flux-framework.org). This is achieved\nby calling `srun flux start` in the submission script: ","metadata":{},"id":"5f77c45c-7077-4edf-ace7-1922faecd380"},{"cell_type":"markdown","source":"```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\nsrun flux start python test_gpu.py\n````","metadata":{},"id":"e2cd51d8-8991-42bc-943a-050b6d7c74c3"},{"cell_type":"markdown","source":"As a result the GPUs available on the two compute nodes are reported: \n```\n>>> [('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn138'),\n>>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]\n```\nIn this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.\n","metadata":{},"id":"87529bb6-1fbb-4416-8edb-0b3124f4dec2"},{"cell_type":"markdown","source":"## Coupled Functions \nFor submitting two functions with rather different computing resource requirements it is essential to represent this \ndependence during the submission process. In `pympipool` this can be achieved by leveraging the separate submission of \nindividual python functions and including the `concurrent.futures.Future` object of the first submitted function as \ninput for the second function during the submission. Consequently, this functionality can be used for directed acyclic \ngraphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one\nand two:","metadata":{},"id":"0e4a6e73-38b1-4a6c-b567-d6b079a58886"},{"cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n future_1 = exe.submit(\n calc_function, \n 1,\n parameter_b=2,\n resource_dict={\"cores\": 1},\n )\n future_2 = exe.submit(\n calc_function, \n 1,\n parameter_b=future_1,\n resource_dict={\"cores\": 1},\n )\n print(future_2.result())","metadata":{"trusted":true},"execution_count":10,"outputs":[{"name":"stdout","text":"4\n","output_type":"stream"}],"id":"c84442ee-68e4-4065-97e7-bfad7582acfc"},{"cell_type":"markdown","source":"Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still \nbefore the computation of this addition is completed already the next addition is submitted which uses the future object\nas an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`. \n\nTo disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the\ncurrent stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this \nfunctionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the \nqueue of submitted functions is queried. Typically, there is no need to change these default parameters. ","metadata":{},"id":"bd3e6eea-3a77-49ec-8fec-d88274aeeda5"},{"cell_type":"markdown","source":"## SLURM Job Scheduler\nUsing `pympipool` without the [flux framework](https://flux-framework.org) results in one `srun` call per worker in \n`block_allocation=True` mode and one `srun` call per submitted function in `block_allocation=False` mode. As each `srun`\ncall represents a request to the central database of SLURM this can drastically reduce the performance, especially for\nlarge numbers of small python functions. That is why the hierarchical job scheduler [flux framework](https://flux-framework.org)\nis recommended as secondary job scheduler even within the context of the SLURM job manager. \n\nStill the general usage of `pympipool` remains similar even with SLURM as backend:","metadata":{},"id":"d1086337-5291-4e06-96d1-a6e162d28c58"},{"cell_type":"markdown","source":"```\nfrom pympipool import Executor\n\nwith Executor(max_cores=1, backend=\"slurm\") as exe:\n future = exe.submit(sum, [1,1])\n print(future.result())\n```","metadata":{},"id":"27569937-7d99-4697-b3ee-f68c43b95a10"},{"cell_type":"markdown","source":"The `backend=\"slurm\"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) \nor SLURM are available. \n\nIn addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\na list of command line arguments for the `srun` command. ","metadata":{},"id":"ae8dd860-f90f-47b4-b3e5-664f5c949350"},{"cell_type":"markdown","source":"## Workstation Support\nWhile the high performance computing (HPC) setup is limited to the Linux operating system, `pympipool` can also be used\nin combination with MacOS and Windows. These setups are limited to a single compute node. \n\nStill the general usage of `pympipool` remains similar:","metadata":{},"id":"449d2c7a-67ba-449e-8e0b-98a228707e1c"},{"cell_type":"code","source":"from pympipool import Executor\n\nwith Executor(max_cores=1, backend=\"mpi\") as exe:\n future = exe.submit(sum, [1,1], resource_dict={\"cores\": 1})\n print(future.result())","metadata":{"trusted":true},"execution_count":11,"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"id":"fa147b3b-61df-4884-b90c-544362bc95d9"},{"cell_type":"markdown","source":"The `backend=\"mpi\"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) \nor SLURM are available. \n\nWorkstations, especially workstations with MacOs can have rather strict firewall settings. This includes limiting the\nlook up of hostnames and communicating with itself via their own hostname. To directly connect to `localhost` rather\nthan using the hostname which is the default for distributed systems, the `hostname_localhost=True` parameter is \nintroduced. ","metadata":{},"id":"0370b42d-237b-4169-862a-b0bac4bb858b"}]} \ No newline at end of file +{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.12.3","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"c31c95fe-9af4-42fd-be2c-713afa380e09","cell_type":"markdown","source":"# Examples\nThe `pympipool.Executor` extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nto simplify the up-scaling of individual functions in a given workflow.","metadata":{}},{"id":"a1c6370e-7c8a-4da2-ac7d-42a36e12b27c","cell_type":"markdown","source":"## Compatibility\nStarting with the basic example of `1+1=2`. With the `ThreadPoolExecutor` from the [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nstandard library this can be written as: ","metadata":{}},{"id":"8b663009-60af-4d71-8ef3-2e9c6cd79cce","cell_type":"code","source":"from concurrent.futures import ThreadPoolExecutor\n\nwith ThreadPoolExecutor(max_workers=1) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":1},{"id":"56192fa7-bbd6-43fe-8598-ff764addfbac","cell_type":"markdown","source":"In this case `max_workers=1` limits the number of threads used by the `ThreadPoolExecutor` to one. Then the `sum()`\nfunction is submitted to the executor with a list with two ones `[1, 1]` as input. A [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nobject is returned. The `Future` object allows to check the status of the execution with the `done()` method which \nreturns `True` or `False` depending on the state of the execution. Or the main process can wait until the execution is \ncompleted by calling `result()`. \n\nThe result of the calculation is `1+1=2`. ","metadata":{}},{"id":"99aba5f3-5667-450c-b31f-2b53918b1896","cell_type":"markdown","source":"The `pympipool.Executor` class extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) \nclass by providing more parameters to specify the level of parallelism. In addition, to specifying the maximum number \nof workers `max_workers` the user can also specify the number of cores per worker `cores_per_worker` for MPI based \nparallelism, the number of threads per core `threads_per_core` for thread based parallelism and the number of GPUs per\nworker `gpus_per_worker`. Finally, for those backends which support over-subscribing this can also be enabled using the \n`oversubscribe` parameter. All these parameters are optional, so the `pympipool.Executor` can be used as a drop-in \nreplacement for the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures).\n\nThe previous example is rewritten for the `pympipool.Executor` in:","metadata":{}},{"id":"559f59cf-f074-4399-846d-a5706797ff64","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=1, executor=flux_exe) as exe:\n future = exe.submit(sum, [1,1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":2},{"id":"cbe445ae-9f52-4449-a936-a4ca1acc4500","cell_type":"markdown","source":"The result of the calculation is again `1+1=2`.","metadata":{}},{"id":"eb838571-24c6-4516-ab13-66f5943325b9","cell_type":"markdown","source":"Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined \nfunctions. In the next example a custom summation function is defined:","metadata":{}},{"id":"e80ca2d6-4308-4e39-bec7-b55cfb024e79","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(*args):\n return sum(*args)\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n fs_1 = exe.submit(calc, [2, 1])\n fs_2 = exe.submit(calc, [2, 2])\n fs_3 = exe.submit(calc, [2, 3])\n fs_4 = exe.submit(calc, [2, 4])\n print([\n fs_1.result(), \n fs_2.result(), \n fs_3.result(), \n fs_4.result(),\n ])\n","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"[3, 4, 5, 6]\n","output_type":"stream"}],"execution_count":3},{"id":"4d97551b-f7c0-416b-bcc3-55392e938ee8","cell_type":"markdown","source":"In contrast to the previous example where just a single function was submitted to a single worker, in this case a total\nof four functions is submitted to a group of two workers `max_cores=2`. Consequently, the functions are executed as a\nset of two pairs.\n\nIt returns the corresponding sums as expected. The same can be achieved with the built-in [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nclasses. Still one advantage of using the `pympipool.Executor` rather than the built-in ones, is the ability to execute \nthe same commands in interactive environments like [Jupyter notebooks](https://jupyter.org). This is achieved by using \n[cloudpickle](https://github.com/cloudpipe/cloudpickle) to serialize the python function and its parameters rather than\nthe regular pickle package. ","metadata":{}},{"id":"a97edc41-1396-48a0-8fb5-98d691a69e90","cell_type":"markdown","source":"For backwards compatibility with the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html) \nclass the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nalso implements the `map()` function to map a series of inputs to a function. The same `map()` function is also \navailable in the `pympipool.Executor`:","metadata":{}},{"id":"3362afef-265f-4432-88ad-e051e6318c77","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(*args):\n return sum(*args)\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"[3, 4, 5, 6]\n","output_type":"stream"}],"execution_count":4},{"id":"27af5cc1-8514-4735-8bba-b4b32444901f","cell_type":"markdown","source":"The results remain the same. ","metadata":{}},{"id":"59747b38-64f8-4342-82ad-a771aaf7c4eb","cell_type":"markdown","source":"## Resource Assignment\nBy default, every submission of a python function results in a flux job (or SLURM job step) depending on the backend. \nThis is sufficient for function calls which take several minutes or longer to execute. For python functions with shorter \nrun-time `pympipool` provides block allocation (enabled by the `block_allocation=True` parameter) to execute multiple \npython functions with similar resource requirements in the same flux job (or SLURM job step). \n\nThe following example illustrates the resource definition on both level. This is redundant. For block allocations the \nresources have to be configured on the **Executor level**, otherwise it can either be defined on the **Executor level**\nor on the **Submission level**. The resource defined on the **Submission level** overwrite the resources defined on the \n**Executor level**.","metadata":{}},{"id":"d29280d4-c085-47b1-b7fa-602732d60832","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor( \n # Resource definition on the executor level\n max_cores=2, # total number of cores available to the Executor\n # Optional resource definition \n cores_per_worker=1,\n threads_per_core=1,\n gpus_per_worker=0,\n oversubscribe=False, # not available with flux\n cwd=\"/home/jovyan/notebooks\",\n executor=flux_exe,\n hostname_localhost=False, # only required on MacOS\n backend=\"flux\", # optional in case the backend is not recognized\n block_allocation=False, \n init_function=None, # only available with block_allocation=True\n command_line_argument_lst=[], # additional command line arguments for SLURM\n ) as exe:\n future_obj = exe.submit(\n calc_function, \n 1, # parameter_a\n parameter_b=2, \n # Resource definition on the submission level\n resource_dict={\n \"cores\": 1,\n \"threads_per_core\": 1,\n \"gpus_per_core\": 0, # here it is gpus_per_core rather than gpus_per_worker\n \"oversubscribe\": False, # not available with flux\n \"cwd\": \"/home/jovyan/notebooks\",\n \"executor\": flux_exe,\n \"hostname_localhost\": False, # only required on MacOS\n # \"command_line_argument_lst\": [], # additional command line arguments for SLURM\n },\n )\n print(future_obj.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"3\n","output_type":"stream"}],"execution_count":5},{"id":"5c7055ad-d84d-4afc-9023-b53643c4138a","cell_type":"markdown","source":"The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other\nresource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io)\nor any GPU, then the resources can be defined on the **Executor level** as: `cores_per_worker=1`, `threads_per_core=1` \nand `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to \nenable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python\nprocess for each submission. In this case the above example can be reduced to: ","metadata":{}},{"id":"cd8f883f-5faf-43bc-b971-354aa9dcbecb","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor( \n # Resource definition on the executor level\n max_cores=2, # total number of cores available to the Executor\n block_allocation=True, # reuse python processes\n executor=flux_exe,\n ) as exe:\n future_obj = exe.submit(\n calc_function, \n 1, # parameter_a\n parameter_b=2, \n )\n print(future_obj.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"3\n","output_type":"stream"}],"execution_count":6},{"id":"ea6a2ef1-c5bc-49c2-adb1-60f9f6cc71f3","cell_type":"markdown","source":"The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task\nis executed in which folder, but for most python functions it is not required.","metadata":{}},{"id":"d6be1cc6-f47b-4b85-a0bc-00f9ccd8e2fd","cell_type":"markdown","source":"## Data Handling\nA limitation of many parallel approaches is the overhead in communication when working with large datasets. Instead of\nreading the same dataset repetitively, the `pympipool.Executor` in block allocation mode (`block_allocation=True`) loads the dataset only once per worker and afterwards \neach function submitted to this worker has access to the dataset, as it is already loaded in memory. To achieve this\nthe user defines an initialization function `init_function` which returns a dictionary with one key per dataset. The \nkeys of the dictionary can then be used as additional input parameters in each function submitted to the `pympipool.Executor`. When block allocation is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded. \n\nThis functionality is illustrated below: ","metadata":{}},{"id":"050c2781-0c8c-436b-949c-580cabf5c63c","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(i, j, k):\n return i + j + k\n\ndef init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=1, init_function=init_function, executor=flux_exe, block_allocation=True) as exe:\n fs = exe.submit(calc, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"10\n","output_type":"stream"}],"execution_count":7},{"id":"8386b4e6-290f-4733-8c50-4312f9ba07e4","cell_type":"markdown","source":"The function `calc()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only \ntwo inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the\nsecond input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the \n`pympipool.Executor` automatically checks the keys set in the `init_function()` function. In this case the returned \ndictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc()` function,\n`i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc()`\nfunction does not define the `l` parameter this one is also ignored. \n\nThe result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()`\nfunction.","metadata":{}},{"id":"0d623365-1b84-4c69-97ee-f6718be8ab39","cell_type":"markdown","source":"## Up-Scaling \n[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`.","metadata":{}},{"id":"33f9eee3-e327-43e4-8f15-3cf709f3975c","cell_type":"markdown","source":"### Thread-based Parallelism\nThe number of threads per core can be controlled with the `threads_per_core` parameter during the initialization of the \n`pympipool.Executor`. Unfortunately, there is no uniform way to control the number of cores a given underlying library \nuses for thread based parallelism, so it might be necessary to set certain environment variables manually: \n\n* `OMP_NUM_THREADS`: for openmp\n* `OPENBLAS_NUM_THREADS`: for openblas\n* `MKL_NUM_THREADS`: for mkl\n* `VECLIB_MAXIMUM_THREADS`: for accelerate on Mac Os X\n* `NUMEXPR_NUM_THREADS`: for numexpr\n\nAt the current stage `pympipool.Executor` does not set these parameters itself, so you have to add them in the function\nyou submit before importing the corresponding library: \n","metadata":{}},{"id":"a9799f38-b9b8-411e-945d-dae951151d26","cell_type":"code","source":"def calc(i):\n import os\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":8},{"id":"4d2af8e0-8b49-40cc-a9ed-298d6c68870c","cell_type":"markdown","source":"Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to\nthe number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable \ndefault. Just be careful if the number of threads is not specified it is possible that all workers try to access all \ncores at the same time which can lead to poor performance. So it is typically a good idea to monitor the CPU utilization\nwith increasing number of workers. \n\nSpecific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require\na higher number of threads per core for optimal performance. \n","metadata":{}},{"id":"2faf6399-0230-4cdd-b4d2-2508dee66d47","cell_type":"markdown","source":"### MPI Parallel Python Functions\nBeyond thread based parallelism, the message passing interface (MPI) is the de facto standard parallel execution in \nscientific computing and the [`mpi4py`](https://mpi4py.readthedocs.io) bindings to the MPI libraries are commonly used\nto parallelize existing workflows. The limitation of this approach is that it requires the whole code to adopt the MPI\ncommunication standards to coordinate the way how information is distributed. Just like the `pympipool.Executor` the \n[`mpi4py.futures.MPIPoolExecutor`](https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#mpipoolexecutor) \nimplements the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\ninterface. Still in this case eah python function submitted to the executor is still limited to serial execution. The\nnovel approach of the `pympipool.Executor` is mixing these two types of parallelism. Individual functions can use\nthe [`mpi4py`](https://mpi4py.readthedocs.io) library to handle the parallel execution within the context of this \nfunction while these functions can still me submitted to the `pympipool.Executor` just like any other function. The\nadvantage of this approach is that the users can parallelize their workflows one function at the time. \n\nThe example in `test_mpi.py` illustrates the submission of a simple MPI parallel python function: ","metadata":{}},{"id":"44e510fc-8897-46a8-bef7-f1a5c47e4fbf","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc(i):\n from mpi4py import MPI\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe, pmi=\"pmix\") as exe:\n fs = exe.submit(calc, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"[(3, 2, 0), (3, 2, 1)]\n","output_type":"stream"}],"execution_count":9},{"id":"4fa03544-1dfc-465a-b352-0458b710cbcd","cell_type":"markdown","source":"In the example environment OpenMPI version 5 is used, so the `pmi` parameter has to be set to `pmix` rather than `pmi1` or `pmi2` which is the default. For `mpich` it is not necessary to specify the `pmi` interface manually.\nThe `calc()` function initializes the [`mpi4py`](https://mpi4py.readthedocs.io) library and gathers the size of the \nallocation and the rank of the current process within the MPI allocation. This function is then submitted to an \n`pympipool.Executor` which is initialized with a single worker with two cores `cores_per_worker=2`. So each function\ncall is going to have access to two cores. \n\nJust like before the script can be called with any python interpreter even though it is using the [`mpi4py`](https://mpi4py.readthedocs.io)\nlibrary in the background it is not necessary to execute the script with `mpiexec` or `mpirun`.\n\nThe response consists of a list of two tuples, one for each MPI parallel process, with the first entry of the tuple \nbeing the parameter `i=3`, followed by the number of MPI parallel processes assigned to the function call `cores_per_worker=2`\nand finally the index of the specific process `0` or `1`. ","metadata":{}},{"id":"581e948b-8c66-42fb-b4b2-279cc9e1c1f3","cell_type":"markdown","source":"### GPU Assignment\nWith the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular.\nConsequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the \n`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: ","metadata":{}},{"id":"7d1bca64-14fb-4d40-997d-b58a011508bf","cell_type":"markdown","source":"```\nimport socket\nimport flux.job\nfrom pympipool import Executor\nfrom tensorflow.python.client import device_lib\n\ndef get_available_gpus():\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(\n max_workers=2, \n gpus_per_worker=1,\n executor=flux_exe,\n ) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{}},{"id":"23794ff4-916f-4b03-a18a-c232bab68dfa","cell_type":"markdown","source":"The additional parameter `gpus_per_worker=1` specifies that one GPU is assigned to each worker. This functionality \nrequires `pympipool` to be connected to a resource manager like the [SLURM workload manager](https://www.schedmd.com)\nor preferably the [flux framework](https://flux-framework.org). The rest of the script follows the previous examples, \nas two functions are submitted and the results are printed. \n\nTo clarify the execution of such an example on a high performance computing (HPC) cluster using the [SLURM workload manager](https://www.schedmd.com)\nthe submission script is given below: ","metadata":{}},{"id":"6dea0b84-65fd-4785-b78d-0ad3ff5aaa95","cell_type":"markdown","source":"```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\npython test_gpu.py\n```","metadata":{}},{"id":"5f77c45c-7077-4edf-ace7-1922faecd380","cell_type":"markdown","source":"The important part is that for using the `pympipool.slurm.PySlurmExecutor` backend the script `test_gpu.py` does not \nneed to be executed with `srun` but rather it is sufficient to just execute it with the python interpreter. `pympipool`\ninternally calls `srun` to assign the individual resources to a given worker. \n\nFor the more complex setup of running the [flux framework](https://flux-framework.org) as a secondary resource scheduler\nwithin the [SLURM workload manager](https://www.schedmd.com) it is essential that the resources are passed from the \n[SLURM workload manager](https://www.schedmd.com) to the [flux framework](https://flux-framework.org). This is achieved\nby calling `srun flux start` in the submission script: ","metadata":{}},{"id":"e2cd51d8-8991-42bc-943a-050b6d7c74c3","cell_type":"markdown","source":"```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\nsrun flux start python test_gpu.py\n````","metadata":{}},{"id":"87529bb6-1fbb-4416-8edb-0b3124f4dec2","cell_type":"markdown","source":"As a result the GPUs available on the two compute nodes are reported: \n```\n>>> [('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn138'),\n>>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]\n```\nIn this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.\n","metadata":{}},{"id":"0e4a6e73-38b1-4a6c-b567-d6b079a58886","cell_type":"markdown","source":"## Coupled Functions \nFor submitting two functions with rather different computing resource requirements it is essential to represent this \ndependence during the submission process. In `pympipool` this can be achieved by leveraging the separate submission of \nindividual python functions and including the `concurrent.futures.Future` object of the first submitted function as \ninput for the second function during the submission. Consequently, this functionality can be used for directed acyclic \ngraphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one\nand two:","metadata":{}},{"id":"c84442ee-68e4-4065-97e7-bfad7582acfc","cell_type":"code","source":"import flux.job\nfrom pympipool import Executor\n\ndef calc_function(parameter_a, parameter_b):\n return parameter_a + parameter_b\n\nwith flux.job.FluxExecutor() as flux_exe:\n with Executor(max_cores=2, executor=flux_exe) as exe:\n future_1 = exe.submit(\n calc_function, \n 1,\n parameter_b=2,\n resource_dict={\"cores\": 1},\n )\n future_2 = exe.submit(\n calc_function, \n 1,\n parameter_b=future_1,\n resource_dict={\"cores\": 1},\n )\n print(future_2.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"4\n","output_type":"stream"}],"execution_count":10},{"id":"bd3e6eea-3a77-49ec-8fec-d88274aeeda5","cell_type":"markdown","source":"Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still \nbefore the computation of this addition is completed already the next addition is submitted which uses the future object\nas an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`. \n\nTo disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the\ncurrent stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this \nfunctionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the \nqueue of submitted functions is queried. Typically, there is no need to change these default parameters. ","metadata":{}},{"id":"d1086337-5291-4e06-96d1-a6e162d28c58","cell_type":"markdown","source":"## SLURM Job Scheduler\nUsing `pympipool` without the [flux framework](https://flux-framework.org) results in one `srun` call per worker in \n`block_allocation=True` mode and one `srun` call per submitted function in `block_allocation=False` mode. As each `srun`\ncall represents a request to the central database of SLURM this can drastically reduce the performance, especially for\nlarge numbers of small python functions. That is why the hierarchical job scheduler [flux framework](https://flux-framework.org)\nis recommended as secondary job scheduler even within the context of the SLURM job manager. \n\nStill the general usage of `pympipool` remains similar even with SLURM as backend:","metadata":{}},{"id":"27569937-7d99-4697-b3ee-f68c43b95a10","cell_type":"markdown","source":"```\nfrom pympipool import Executor\n\nwith Executor(max_cores=1, backend=\"slurm\") as exe:\n future = exe.submit(sum, [1,1])\n print(future.result())\n```","metadata":{}},{"id":"ae8dd860-f90f-47b4-b3e5-664f5c949350","cell_type":"markdown","source":"The `backend=\"slurm\"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) \nor SLURM are available. \n\nIn addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\na list of command line arguments for the `srun` command. ","metadata":{}},{"id":"449d2c7a-67ba-449e-8e0b-98a228707e1c","cell_type":"markdown","source":"## Workstation Support\nWhile the high performance computing (HPC) setup is limited to the Linux operating system, `pympipool` can also be used\nin combination with MacOS and Windows. These setups are limited to a single compute node. \n\nStill the general usage of `pympipool` remains similar:","metadata":{}},{"id":"fa147b3b-61df-4884-b90c-544362bc95d9","cell_type":"code","source":"from pympipool import Executor\n\nwith Executor(max_cores=1, backend=\"local\") as exe:\n future = exe.submit(sum, [1,1], resource_dict={\"cores\": 1})\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"2\n","output_type":"stream"}],"execution_count":11},{"id":"0370b42d-237b-4169-862a-b0bac4bb858b","cell_type":"markdown","source":"The `backend=\"local\"` parameter is optional as `pympipool` automatically recognizes if [flux framework](https://flux-framework.org) \nor SLURM are available. \n\nWorkstations, especially workstations with MacOs can have rather strict firewall settings. This includes limiting the\nlook up of hostnames and communicating with itself via their own hostname. To directly connect to `localhost` rather\nthan using the hostname which is the default for distributed systems, the `hostname_localhost=True` parameter is \nintroduced. ","metadata":{}}]} \ No newline at end of file From db994989679ee8dbdc364022f2f9248a0591f08a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 30 May 2024 09:45:17 +0200 Subject: [PATCH 8/9] fix test skip --- tests/test_executor_backend_mpi.py | 4 ++-- tests/test_mpi_executor.py | 10 +++++----- tests/test_shared_communication.py | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 4c8a126c..a7c7d335 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -5,7 +5,7 @@ from pympipool.shared.executorbase import cloudpickle_register -mpi4py_installed = importlib.util.find_spec("mpi4py") is not None +skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None def calc(i): @@ -46,7 +46,7 @@ def test_meta_executor_single(self): self.assertTrue(fs_2.done()) @unittest.skipIf( - mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_meta_executor_parallel(self): with Executor( diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 796d5182..c57696fc 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -15,7 +15,7 @@ ) -mpi4py_installed = importlib.util.find_spec("mpi4py") is not None +skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None def calc(i): @@ -132,7 +132,7 @@ def test_pympiexecutor_errors(self): @unittest.skipIf( - mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): @@ -172,7 +172,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): @unittest.skipIf( - mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) class TestPyMpiStepExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): @@ -350,7 +350,7 @@ def test_meta_step(self): self.assertEqual(str(exe.info[k]), v) @unittest.skipIf( - mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_multi_core(self): with PyMPIExecutor( @@ -366,7 +366,7 @@ def test_pool_multi_core(self): self.assertEqual(output.result(), [(2, 2, 0), (2, 2, 1)]) @unittest.skipIf( - mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_multi_core_map(self): with PyMPIExecutor( diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 1fa0db63..a096c781 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -17,7 +17,7 @@ from pympipool.shared.interface import MpiExecInterface -mpi4py_installed = importlib.util.find_spec("mpi4py") is not None +skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None def calc(i): @@ -26,7 +26,7 @@ def calc(i): class TestInterface(unittest.TestCase): @unittest.skipIf( - mpi4py_installed, "mpi4py is not installed, so the mpi4py tests are skipped." + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_interface_mpi(self): cloudpickle_register(ind=1) From 7792f3d7c9355ec99f763c4b6103effa39d16eee Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 30 May 2024 09:50:12 +0200 Subject: [PATCH 9/9] skip more tests --- tests/test_mpi_executor.py | 6 ++++++ tests/test_mpi_executor_future.py | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index c57696fc..d8de5696 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -310,6 +310,9 @@ def test_executor_exception_future(self): fs = p.submit(raise_error) fs.result() + @unittest.skipIf( + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." + ) def test_meta(self): meta_data_exe_dict = { "cores": 2, @@ -424,6 +427,9 @@ def test_execute_task(self): self.assertEqual(f.result(), np.array(4)) q.join() + @unittest.skipIf( + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." + ) def test_execute_task_parallel(self): f = Future() q = Queue() diff --git a/tests/test_mpi_executor_future.py b/tests/test_mpi_executor_future.py index 51aa0df5..dd96259c 100644 --- a/tests/test_mpi_executor_future.py +++ b/tests/test_mpi_executor_future.py @@ -1,4 +1,5 @@ from concurrent.futures import Future +import importlib.util from time import sleep import unittest @@ -7,6 +8,9 @@ from pympipool.scheduler.mpi import PyMPIExecutor +skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None + + def calc(i): return np.array(i**2) @@ -23,6 +27,9 @@ def test_pool_serial(self): self.assertTrue(output.done()) self.assertEqual(output.result(), np.array(4)) + @unittest.skipIf( + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." + ) def test_pool_serial_multi_core(self): with PyMPIExecutor( max_workers=1, cores_per_worker=2, hostname_localhost=True