Skip to content

Commit

Permalink
Merge pull request #352 from pyiron/universal_executor
Browse files Browse the repository at this point in the history
Introduce a single Universal Executor
  • Loading branch information
jan-janssen authored May 30, 2024
2 parents b113227 + 5a37774 commit 810944f
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 299 deletions.
37 changes: 20 additions & 17 deletions pympipool/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import os
import shutil
from typing import Optional
from pympipool.scheduler.local import (
PyLocalExecutor,
PyLocalStepExecutor,
from pympipool.scheduler.universal import (
UniversalExecutor,
UniversalStepExecutor,
)
from pympipool.scheduler.slurm import (
PySlurmExecutor,
PySlurmStepExecutor,
from pympipool.scheduler.interface import (
MpiExecInterface,
SLURM_COMMAND,
SrunInterface,
)
from pympipool.shared.interface import SLURM_COMMAND
from pympipool.shared.inputcheck import (
check_command_line_argument_lst,
check_gpus_per_worker,
Expand All @@ -23,10 +23,7 @@
)

try: # The PyFluxExecutor requires flux-core to be installed.
from pympipool.scheduler.flux import (
PyFluxExecutor,
PyFluxStepExecutor,
)
from pympipool.scheduler.flux import FluxPythonInterface

flux_installed = "FLUX_URI" in os.environ
except ImportError:
Expand Down Expand Up @@ -111,14 +108,16 @@ def create_executor(
executor_kwargs["pmi"] = pmi
if block_allocation:
executor_kwargs["init_function"] = init_function
return PyFluxExecutor(
return UniversalExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
)
else:
return PyFluxStepExecutor(
return UniversalStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
)
elif backend == "slurm":
check_executor(executor=executor)
Expand All @@ -128,14 +127,16 @@ def create_executor(
executor_kwargs["oversubscribe"] = oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return PySlurmExecutor(
return UniversalExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
)
else:
return PySlurmStepExecutor(
return UniversalStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
)
else: # backend="local"
check_threads_per_core(threads_per_core=threads_per_core)
Expand All @@ -147,12 +148,14 @@ def create_executor(
executor_kwargs["oversubscribe"] = oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return PyLocalExecutor(
return UniversalExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
)
else:
return PyLocalStepExecutor(
return UniversalStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
)
107 changes: 1 addition & 106 deletions pympipool/scheduler/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,112 +3,7 @@

import flux.job

from pympipool.shared.executorbase import (
execute_parallel_tasks,
execute_separate_tasks,
ExecutorBroker,
ExecutorSteps,
)
from pympipool.shared.interface import BaseInterface
from pympipool.shared.thread import RaisingThread


class PyFluxExecutor(ExecutorBroker):
"""
The pympipool.flux.PyFluxExecutor leverages the flux framework to distribute python tasks within a queuing system
allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number of
threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.
Args:
max_workers (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor
Examples:
>>> import numpy as np
>>> from pympipool.scheduler.flux import PyFluxExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
"""

def __init__(
self,
max_workers: int = 1,
executor_kwargs: dict = {},
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["interface_class"] = FluxPythonInterface
self._set_process(
process=[
RaisingThread(
target=execute_parallel_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
)


class PyFluxStepExecutor(ExecutorSteps):
"""
The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing
system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number
of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.
Args:
max_cores (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor
Examples:
>>> import numpy as np
>>> from pympipool.scheduler.flux import PyFluxStepExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> with PyFluxStepExecutor(max_cores=2) as p:
>>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
"""

def __init__(
self,
max_cores: int = 1,
executor_kwargs: dict = {},
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["interface_class"] = FluxPythonInterface
executor_kwargs["max_cores"] = max_cores
self._set_process(
RaisingThread(
target=execute_separate_tasks,
kwargs=executor_kwargs,
)
)
from pympipool.scheduler.interface import BaseInterface


class FluxPythonInterface(BaseInterface):
Expand Down
File renamed without changes.
104 changes: 0 additions & 104 deletions pympipool/scheduler/local.py

This file was deleted.

Loading

0 comments on commit 810944f

Please sign in to comment.