Skip to content

Commit

Permalink
Merge pull request #1 from pyiron/main
Browse files Browse the repository at this point in the history
Merge main
  • Loading branch information
jan-janssen authored Nov 7, 2023
2 parents 1728524 + f9eb44e commit 7f8e9c8
Show file tree
Hide file tree
Showing 20 changed files with 263 additions and 163 deletions.
4 changes: 2 additions & 2 deletions .ci_support/environment-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencies:
- python
- numpy
- mpich
- cloudpickle =2.2.1
- mpi4py =3.1.4
- cloudpickle =3.0.0
- mpi4py =3.1.5
- tqdm =4.66.1
- pyzmq =25.1.1
4 changes: 2 additions & 2 deletions .ci_support/environment-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencies:
- python
- numpy
- openmpi
- cloudpickle =2.2.1
- mpi4py =3.1.4
- cloudpickle =3.0.0
- mpi4py =3.1.5
- tqdm =4.66.1
- pyzmq =25.1.1
4 changes: 2 additions & 2 deletions .ci_support/environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencies:
- python
- numpy
- msmpi
- cloudpickle =2.2.1
- mpi4py =3.1.4
- cloudpickle =3.0.0
- mpi4py =3.1.5
- tqdm =4.66.1
- pyzmq =25.1.1
2 changes: 1 addition & 1 deletion .github/workflows/unittest-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ jobs:
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: for f in $(ls tests/test_*.py); do echo $f; python -m unittest $f; done
run: cd tests; python -m unittest discover .
8 changes: 4 additions & 4 deletions .github/workflows/unittest-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ jobs:
python-version: '3.11'
label: linux-64-py-3-11-openmpi
prefix: /usr/share/miniconda3/envs/my-env

- operating-system: ubuntu-latest
python-version: '3.10'
label: linux-64-py-3-10-openmpi
prefix: /usr/share/miniconda3/envs/my-env

- operating-system: ubuntu-latest
python-version: 3.9
label: linux-64-py-3-9-openmpi
prefix: /usr/share/miniconda3/envs/my-env

- operating-system: ubuntu-latest
python-version: 3.8
label: linux-64-py-3-8-openmpi
Expand All @@ -57,7 +57,7 @@ jobs:
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: for f in $(ls tests/test_*.py); do echo $f; python -m unittest $f; done
run: cd tests; python -m unittest discover .
env:
OMPI_MCA_plm: 'isolated'
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unittest-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ jobs:
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: for f in $(ls tests/test_*.py); do echo $f; python -m unittest $f; done
run: cd tests; python -m unittest discover .
11 changes: 11 additions & 0 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from ._version import get_versions
from pympipool.mpi.executor import PyMPIExecutor

try: # The PyFluxExecutor requires flux-core to be installed.
from pympipool.flux.executor import PyFluxExecutor
except ImportError:
pass

try: # The PySlurmExecutor requires the srun command to be available.
from pympipool.slurm.executor import PySlurmExecutor
except ImportError:
pass

__version__ = get_versions()["version"]
del get_versions
60 changes: 31 additions & 29 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

class PyFluxExecutor(ExecutorBase):
"""
The pympipool.flux.PyFluxExecutor leverages the flux framework to distribute python tasks within a queuing system
allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number of
threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.
Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
Expand All @@ -23,11 +28,32 @@ class PyFluxExecutor(ExecutorBase):
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
Examples:
```
>>> import numpy as np
>>> from pympipool.flux import PyFluxExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
self,
max_workers,
max_workers=1,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
Expand Down Expand Up @@ -59,39 +85,15 @@ def __init__(

class PyFluxSingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process
and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the
mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the
usability in particular when used in combination with Jupyter notebooks.
The pympipool.flux.PyFluxSingleTaskExecutor is the internal worker for the pympipool.flux.PyFluxExecutor.
Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
Examples:
```
>>> import numpy as np
>>> from pympipool.flux.executor import PyFluxSingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxSingleTaskExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
"""

def __init__(
Expand Down Expand Up @@ -136,10 +138,10 @@ def __init__(
super().__init__(
cwd=cwd,
cores=cores,
gpus_per_core=gpus_per_core,
threads_per_core=threads_per_core,
oversubscribe=oversubscribe,
)
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._executor = executor
self._future = None

Expand Down
112 changes: 30 additions & 82 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,57 @@
ExecutorBase,
executor_broker,
)
from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface
from pympipool.shared.interface import MpiExecInterface
from pympipool.shared.thread import RaisingThread


class PyMPIExecutor(ExecutorBase):
"""
The pympipool.mpi.PyMPIExecutor leverages the message passing interface MPI to distribute python tasks within an
MPI allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyMPIExecutor can be executed
in a serial python process and does not require the python script to be executed with MPI. Consequently, it is
primarily an abstraction of its functionality to improve the usability in particular when used in combination with \
Jupyter notebooks.
Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False
Examples:
```
>>> import numpy as np
>>> from pympipool.mpi import PyMPIExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyMPIExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
self,
max_workers,
max_workers=1,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
enable_slurm_backend=False,
):
super().__init__()
if not enable_slurm_backend:
if threads_per_core != 1:
raise ValueError(
"The MPI backend only supports threads_per_core=1, "
+ "to manage threads use the SLURM queuing system enable_slurm_backend=True ."
)
elif gpus_per_worker != 0:
raise ValueError(
"The MPI backend only supports gpus_per_core=0, "
+ "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ."
)
self._process = RaisingThread(
target=executor_broker,
kwargs={
Expand All @@ -56,64 +65,32 @@ def __init__(
"executor_class": PyMPISingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
"enable_slurm_backend": enable_slurm_backend,
},
)
self._process.start()


class PyMPISingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process
and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the
mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the
usability in particular when used in combination with Jupyter notebooks.
The pympipool.mpi.PyMPISingleTaskExecutor is the internal worker for the pympipool.mpi.PyMPIExecutor.
Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False
Examples:
```
>>> import numpy as np
>>> from pympipool.mpi.executor import PyMPISingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyMPISingleTaskExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
self,
cores=1,
threads_per_core=1,
gpus_per_task=0,
oversubscribe=False,
init_function=None,
cwd=None,
enable_slurm_backend=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -122,41 +99,12 @@ def __init__(
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": get_interface,
"interface_class": MpiExecInterface,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"oversubscribe": oversubscribe,
"enable_slurm_backend": enable_slurm_backend,
},
)
self._process.start()
self._set_init_function(init_function=init_function)
cloudpickle_register(ind=3)


def get_interface(
cores=1,
threads_per_core=1,
gpus_per_core=0,
cwd=None,
oversubscribe=False,
enable_slurm_backend=False,
):
if not enable_slurm_backend:
return MpiExecInterface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_core,
oversubscribe=oversubscribe,
)
else:
return SlurmSubprocessInterface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_core,
oversubscribe=oversubscribe,
)
2 changes: 1 addition & 1 deletion pympipool/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
)
from pympipool.shared.executorbase import cancel_items_in_queue
from pympipool.shared.thread import RaisingThread
from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface
from pympipool.shared.interface import MpiExecInterface, SrunInterface
Loading

0 comments on commit 7f8e9c8

Please sign in to comment.