Skip to content

Commit

Permalink
Merge pull request #190 from pyiron/slurm
Browse files Browse the repository at this point in the history
Split Slurm and MPI interface
  • Loading branch information
jan-janssen authored Nov 2, 2023
2 parents 823cabb + 9520670 commit 86fa864
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 94 deletions.
4 changes: 2 additions & 2 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ def __init__(
super().__init__(
cwd=cwd,
cores=cores,
gpus_per_core=gpus_per_core,
threads_per_core=threads_per_core,
oversubscribe=oversubscribe,
)
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._executor = executor
self._future = None

Expand Down
59 changes: 2 additions & 57 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
ExecutorBase,
executor_broker,
)
from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface
from pympipool.shared.interface import MpiExecInterface
from pympipool.shared.thread import RaisingThread


Expand All @@ -13,39 +13,22 @@ class PyMPIExecutor(ExecutorBase):
Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False
"""

def __init__(
self,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
enable_slurm_backend=False,
):
super().__init__()
if not enable_slurm_backend:
if threads_per_core != 1:
raise ValueError(
"The MPI backend only supports threads_per_core=1, "
+ "to manage threads use the SLURM queuing system enable_slurm_backend=True ."
)
elif gpus_per_worker != 0:
raise ValueError(
"The MPI backend only supports gpus_per_core=0, "
+ "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ."
)
self._process = RaisingThread(
target=executor_broker,
kwargs={
Expand All @@ -56,12 +39,9 @@ def __init__(
"executor_class": PyMPISingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
"enable_slurm_backend": enable_slurm_backend,
},
)
self._process.start()
Expand All @@ -77,12 +57,9 @@ class PyMPISingleTaskExecutor(ExecutorBase):
Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False
Examples:
```
Expand All @@ -108,12 +85,9 @@ class PyMPISingleTaskExecutor(ExecutorBase):
def __init__(
self,
cores=1,
threads_per_core=1,
gpus_per_task=0,
oversubscribe=False,
init_function=None,
cwd=None,
enable_slurm_backend=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -122,41 +96,12 @@ def __init__(
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": get_interface,
"interface_class": MpiExecInterface,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"oversubscribe": oversubscribe,
"enable_slurm_backend": enable_slurm_backend,
},
)
self._process.start()
self._set_init_function(init_function=init_function)
cloudpickle_register(ind=3)


def get_interface(
cores=1,
threads_per_core=1,
gpus_per_core=0,
cwd=None,
oversubscribe=False,
enable_slurm_backend=False,
):
if not enable_slurm_backend:
return MpiExecInterface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_core,
oversubscribe=oversubscribe,
)
else:
return SlurmSubprocessInterface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_core,
oversubscribe=oversubscribe,
)
2 changes: 1 addition & 1 deletion pympipool/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
)
from pympipool.shared.executorbase import cancel_items_in_queue
from pympipool.shared.thread import RaisingThread
from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface
from pympipool.shared.interface import MpiExecInterface, SrunInterface
33 changes: 19 additions & 14 deletions pympipool/shared/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@


class BaseInterface(ABC):
def __init__(
self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False
):
def __init__(self, cwd, cores=1, oversubscribe=False):
self._cwd = cwd
self._cores = cores
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._oversubscribe = oversubscribe

def bootup(self, command_lst):
Expand All @@ -27,15 +23,11 @@ def __init__(
self,
cwd=None,
cores=1,
threads_per_core=1,
gpus_per_core=0,
oversubscribe=False,
):
super().__init__(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_core,
oversubscribe=oversubscribe,
)
self._process = None
Expand Down Expand Up @@ -63,15 +55,30 @@ class MpiExecInterface(SubprocessInterface):
def generate_command(self, command_lst):
command_prepend_lst = generate_mpiexec_command(
cores=self._cores,
gpus_per_core=self._gpus_per_core,
oversubscribe=self._oversubscribe,
)
return super().generate_command(
command_lst=command_prepend_lst + command_lst,
)


class SlurmSubprocessInterface(SubprocessInterface):
class SrunInterface(SubprocessInterface):
def __init__(
self,
cwd=None,
cores=1,
threads_per_core=1,
gpus_per_core=0,
oversubscribe=False,
):
super().__init__(
cwd=cwd,
cores=cores,
oversubscribe=oversubscribe,
)
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core

def generate_command(self, command_lst):
command_prepend_lst = generate_slurm_command(
cores=self._cores,
Expand All @@ -85,12 +92,10 @@ def generate_command(self, command_lst):
)


def generate_mpiexec_command(cores, gpus_per_core=0, oversubscribe=False):
def generate_mpiexec_command(cores, oversubscribe=False):
command_prepend_lst = ["mpiexec", "-n", str(cores)]
if oversubscribe:
command_prepend_lst += ["--oversubscribe"]
if gpus_per_core > 0:
raise ValueError()
return command_prepend_lst


Expand Down
1 change: 1 addition & 0 deletions pympipool/slurm/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pympipool.slurm.executor import PySlurmExecutor
119 changes: 119 additions & 0 deletions pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from pympipool.shared.executorbase import (
cloudpickle_register,
execute_parallel_tasks,
ExecutorBase,
executor_broker,
)
from pympipool.shared.interface import SrunInterface
from pympipool.shared.thread import RaisingThread


class PySlurmExecutor(ExecutorBase):
"""
Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
"""

def __init__(
self,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
):
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": PySlurmSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
},
)
self._process.start()


class PySlurmSingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process
and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the
mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the
usability in particular when used in combination with Jupyter notebooks.
Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
Examples:
```
>>> import numpy as np
>>> from pympipool.mpi.executor import PyMPISingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyMPISingleTaskExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
self,
cores=1,
threads_per_core=1,
gpus_per_task=0,
oversubscribe=False,
init_function=None,
cwd=None,
):
super().__init__()
self._process = RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": SrunInterface,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"oversubscribe": oversubscribe,
},
)
self._process.start()
self._set_init_function(init_function=init_function)
cloudpickle_register(ind=3)
2 changes: 1 addition & 1 deletion tests/test_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_interface(self):
task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}}
interface = SocketInterface(
interface=MpiExecInterface(
cwd=None, cores=1, gpus_per_core=0, oversubscribe=False
cwd=None, cores=1, oversubscribe=False
)
)
interface.bootup(
Expand Down
4 changes: 2 additions & 2 deletions tests/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def test_meta_executor_parallel(self):
self.assertTrue(fs_1.done())

def test_errors(self):
with self.assertRaises(ValueError):
with self.assertRaises(TypeError):
PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2)
with self.assertRaises(ValueError):
with self.assertRaises(TypeError):
PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1)
Loading

0 comments on commit 86fa864

Please sign in to comment.