Skip to content

Commit

Permalink
Merge pull request #353 from pyiron/refactor
Browse files Browse the repository at this point in the history
Refactor pympipool to separate interactive and cache interface
  • Loading branch information
jan-janssen authored May 30, 2024
2 parents 810944f + 5e986f5 commit 8f7b77b
Show file tree
Hide file tree
Showing 29 changed files with 107 additions and 105 deletions.
4 changes: 2 additions & 2 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import Optional
from ._version import get_versions
from pympipool.scheduler import create_executor
from pympipool.interactive import create_executor
from pympipool.shell.executor import SubprocessExecutor
from pympipool.shell.interactive import ShellExecutor
from pympipool.shared.dependencies import ExecutorWithDependencies
from pympipool.interactive.dependencies import ExecutorWithDependencies
from pympipool.shared.inputcheck import check_refresh_rate as _check_refresh_rate


Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
interface_shutdown,
interface_receive,
)
from pympipool.shared.backend import call_funct, parse_arguments
from pympipool.interactive.backend import call_funct, parse_arguments


def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
interface_shutdown,
interface_receive,
)
from pympipool.shared.backend import call_funct, parse_arguments
from pympipool.interactive.backend import call_funct, parse_arguments


def main(argument_lst: Optional[list[str]] = None):
Expand Down
2 changes: 1 addition & 1 deletion pympipool/cache/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from pympipool.shared.executorbase import ExecutorBase
from pympipool.shared.executor import ExecutorBase
from pympipool.shared.thread import RaisingThread
from pympipool.cache.shared import execute_in_subprocess, execute_tasks_h5

Expand Down
8 changes: 4 additions & 4 deletions pympipool/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import cloudpickle

from pympipool.cache.hdf import dump, load, get_output
from pympipool.shared.executorbase import get_command_path
from pympipool.shared.executor import get_command_path


class FutureItem:
Expand Down Expand Up @@ -137,7 +137,7 @@ def _get_execute_command(file_name: str, cores: int = 1) -> list:
Get command to call backend as a list of two strings
Args:
file_name (str):
cores (int): Number of cores used to execute the task, if it is greater than one use mpiexec_interactive.py else serial_interactive.py
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
Expand All @@ -146,14 +146,14 @@ def _get_execute_command(file_name: str, cores: int = 1) -> list:
command_lst = (
["mpiexec", "-n", str(cores)]
+ command_lst
+ [get_command_path(executable="mpiexec_cache.py"), file_name]
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
)
else:
command_lst += [get_command_path(executable="serial_cache.py"), file_name]
command_lst += [get_command_path(executable="cache_serial.py"), file_name]
return command_lst


Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import os
import shutil
from typing import Optional
from pympipool.scheduler.universal import (
UniversalExecutor,
UniversalStepExecutor,
from pympipool.interactive.executor import (
InteractiveExecutor,
InteractiveStepExecutor,
)
from pympipool.scheduler.interface import (
from pympipool.shared.interface import (
MpiExecInterface,
SLURM_COMMAND,
SrunInterface,
Expand All @@ -23,7 +23,7 @@
)

try: # The PyFluxExecutor requires flux-core to be installed.
from pympipool.scheduler.flux import FluxPythonInterface
from pympipool.interactive.flux import FluxPythonInterface

flux_installed = "FLUX_URI" in os.environ
except ImportError:
Expand Down Expand Up @@ -108,13 +108,13 @@ def create_executor(
executor_kwargs["pmi"] = pmi
if block_allocation:
executor_kwargs["init_function"] = init_function
return UniversalExecutor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
)
else:
return UniversalStepExecutor(
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
Expand All @@ -127,13 +127,13 @@ def create_executor(
executor_kwargs["oversubscribe"] = oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return UniversalExecutor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
)
else:
return UniversalStepExecutor(
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
Expand All @@ -148,13 +148,13 @@ def create_executor(
executor_kwargs["oversubscribe"] = oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return UniversalExecutor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
)
else:
return UniversalStepExecutor(
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pympipool.scheduler import create_executor
from pympipool.shared.executorbase import ExecutorSteps, execute_tasks_with_dependencies
from pympipool.interactive import create_executor
from pympipool.shared.executor import ExecutorSteps, execute_tasks_with_dependencies
from pympipool.shared.thread import RaisingThread


Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from pympipool.shared.executorbase import (
from pympipool.shared.executor import (
execute_parallel_tasks,
execute_separate_tasks,
ExecutorBroker,
ExecutorSteps,
)
from pympipool.scheduler.interface import BaseInterface
from pympipool.shared.interface import BaseInterface
from pympipool.shared.thread import RaisingThread
from pympipool.scheduler.interface import MpiExecInterface
from pympipool.shared.interface import MpiExecInterface


class UniversalExecutor(ExecutorBroker):
class InteractiveExecutor(ExecutorBroker):
"""
The pympipool.scheduler.universal.UniversalExecutor leverages the pympipool interfaces to distribute python tasks on
a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
pympipool.scheduler.universal.UniversalExecutor can be executed in a serial python process and does not require the
python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to improve
the usability in particular when used in combination with Jupyter notebooks.
The pympipool.interactive.executor.InteractiveExecutor leverages the pympipool interfaces to distribute python tasks
on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
pympipool.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require
the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to
improves the usability in particular when used in combination with Jupyter notebooks.
Args:
max_workers (int): defines the number workers which can execute functions in parallel
Expand All @@ -25,7 +25,7 @@ class UniversalExecutor(ExecutorBroker):
Examples:
>>> import numpy as np
>>> from pympipool.scheduler.flux import PyFluxExecutor
>>> from pympipool.interactive.executor import InteractiveExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand All @@ -36,7 +36,7 @@ class UniversalExecutor(ExecutorBroker):
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
>>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
Expand All @@ -63,21 +63,23 @@ def __init__(
)


class UniversalStepExecutor(ExecutorSteps):
class InteractiveStepExecutor(ExecutorSteps):
"""
The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing
system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number
of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.
The pympipool.interactive.executor.InteractiveStepExecutor leverages the pympipool interfaces to distribute python
tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.interactive.executor.InteractiveStepExecutor
can be executed in a serial python process and does not require the python script to be executed with MPI.
Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used
in combination with Jupyter notebooks.
Args:
max_cores (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor
interface_class (BaseInterface): interface class to initiate python processes
Examples:
>>> import numpy as np
>>> from pympipool.scheduler.flux import PyFluxStepExecutor
>>> from pympipool.interactive.executor import InteractiveStepExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import flux.job

from pympipool.scheduler.interface import BaseInterface
from pympipool.shared.interface import BaseInterface


class FluxPythonInterface(BaseInterface):
Expand Down
4 changes: 2 additions & 2 deletions pympipool/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
interface_shutdown,
interface_receive,
)
from pympipool.shared.executorbase import cancel_items_in_queue
from pympipool.shared.executor import cancel_items_in_queue
from pympipool.shared.thread import RaisingThread
from pympipool.scheduler.interface import MpiExecInterface, SrunInterface
from pympipool.shared.interface import MpiExecInterface, SrunInterface


__all__ = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from pympipool.shared.communication import interface_bootup
from pympipool.shared.thread import RaisingThread
from pympipool.scheduler.interface import BaseInterface, MpiExecInterface
from pympipool.shared.interface import BaseInterface, MpiExecInterface
from pympipool.shared.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
Expand Down Expand Up @@ -432,20 +432,20 @@ def _get_backend_path(cores: int) -> list:
Get command to call backend as a list of two strings
Args:
cores (int): Number of cores used to execute the task, if it is greater than one use mpiexec_interactive.py else serial_interactive.py
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst += [get_command_path(executable="mpiexec_interactive.py")]
command_lst += [get_command_path(executable="interactive_parallel.py")]
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
)
else:
command_lst += [get_command_path(executable="serial_interactive.py")]
command_lst += [get_command_path(executable="interactive_serial.py")]
return command_lst


Expand All @@ -454,7 +454,7 @@ def _get_command_path(executable: str) -> str:
Get path of the backend executable script
Args:
executable (str): Name of the backend executable script, either mpiexec_interactive.py or serial_interactive.py
executable (str): Name of the backend executable script, either interactive_parallel.py or interactive_serial.py
Returns:
str: absolute path to the executable script
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion pympipool/shell/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from concurrent.futures import Future
import subprocess

from pympipool.shared.executorbase import ExecutorBroker
from pympipool.shared.executor import ExecutorBroker
from pympipool.shared.thread import RaisingThread


Expand Down
2 changes: 1 addition & 1 deletion pympipool/shell/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import subprocess
from time import sleep

from pympipool.shared.executorbase import cancel_items_in_queue, ExecutorBase
from pympipool.shared.executor import cancel_items_in_queue, ExecutorBase
from pympipool.shared.thread import RaisingThread


Expand Down
2 changes: 1 addition & 1 deletion tests/test_backend_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import cloudpickle
import zmq

from pympipool.backend.serial_interactive import main
from pympipool.backend.interactive_serial import main


def calc(i, j):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from time import sleep

from pympipool import Executor
from pympipool.shared.executorbase import cloudpickle_register
from pympipool.shared.executor import cloudpickle_register


def add_function(parameter_1, parameter_2):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_executor_backend_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

try:
import flux.job
from pympipool.scheduler.flux import FluxPythonInterface
from pympipool.interactive.flux import FluxPythonInterface

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest

from pympipool import Executor
from pympipool.shared.executorbase import cloudpickle_register
from pympipool.shared.executor import cloudpickle_register


skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None
Expand Down
2 changes: 1 addition & 1 deletion tests/test_executor_backend_mpi_noblock.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

from pympipool import Executor
from pympipool.shared.executorbase import cloudpickle_register
from pympipool.shared.executor import cloudpickle_register


def calc(i):
Expand Down
Loading

0 comments on commit 8f7b77b

Please sign in to comment.