diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 1cbb297c..920b728a 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -1,9 +1,9 @@ from typing import Optional from ._version import get_versions -from pympipool.scheduler import create_executor +from pympipool.interactive import create_executor from pympipool.shell.executor import SubprocessExecutor from pympipool.shell.interactive import ShellExecutor -from pympipool.shared.dependencies import ExecutorWithDependencies +from pympipool.interactive.dependencies import ExecutorWithDependencies from pympipool.shared.inputcheck import check_refresh_rate as _check_refresh_rate diff --git a/pympipool/backend/mpiexec_cache.py b/pympipool/backend/cache_parallel.py similarity index 100% rename from pympipool/backend/mpiexec_cache.py rename to pympipool/backend/cache_parallel.py diff --git a/pympipool/backend/serial_cache.py b/pympipool/backend/cache_serial.py similarity index 100% rename from pympipool/backend/serial_cache.py rename to pympipool/backend/cache_serial.py diff --git a/pympipool/backend/mpiexec_interactive.py b/pympipool/backend/interactive_parallel.py similarity index 97% rename from pympipool/backend/mpiexec_interactive.py rename to pympipool/backend/interactive_parallel.py index 18e896db..e26b4227 100644 --- a/pympipool/backend/mpiexec_interactive.py +++ b/pympipool/backend/interactive_parallel.py @@ -10,7 +10,7 @@ interface_shutdown, interface_receive, ) -from pympipool.shared.backend import call_funct, parse_arguments +from pympipool.interactive.backend import call_funct, parse_arguments def main(): diff --git a/pympipool/backend/serial_interactive.py b/pympipool/backend/interactive_serial.py similarity index 96% rename from pympipool/backend/serial_interactive.py rename to pympipool/backend/interactive_serial.py index e6398fdb..9ac2c70f 100644 --- a/pympipool/backend/serial_interactive.py +++ b/pympipool/backend/interactive_serial.py @@ -8,7 +8,7 @@ interface_shutdown, interface_receive, ) -from pympipool.shared.backend import call_funct, parse_arguments +from pympipool.interactive.backend import call_funct, parse_arguments def main(argument_lst: Optional[list[str]] = None): diff --git a/pympipool/cache/executor.py b/pympipool/cache/executor.py index 4a3df2a5..5380fb6a 100644 --- a/pympipool/cache/executor.py +++ b/pympipool/cache/executor.py @@ -1,6 +1,6 @@ import os -from pympipool.shared.executorbase import ExecutorBase +from pympipool.shared.executor import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.cache.shared import execute_in_subprocess, execute_tasks_h5 diff --git a/pympipool/cache/shared.py b/pympipool/cache/shared.py index b08d8638..90505856 100644 --- a/pympipool/cache/shared.py +++ b/pympipool/cache/shared.py @@ -11,7 +11,7 @@ import cloudpickle from pympipool.cache.hdf import dump, load, get_output -from pympipool.shared.executorbase import get_command_path +from pympipool.shared.executor import get_command_path class FutureItem: @@ -137,7 +137,7 @@ def _get_execute_command(file_name: str, cores: int = 1) -> list: Get command to call backend as a list of two strings Args: file_name (str): - cores (int): Number of cores used to execute the task, if it is greater than one use mpiexec_interactive.py else serial_interactive.py + cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py Returns: list[str]: List of strings containing the python executable path and the backend script to execute """ @@ -146,14 +146,14 @@ def _get_execute_command(file_name: str, cores: int = 1) -> list: command_lst = ( ["mpiexec", "-n", str(cores)] + command_lst - + [get_command_path(executable="mpiexec_cache.py"), file_name] + + [get_command_path(executable="cache_parallel.py"), file_name] ) elif cores > 1: raise ImportError( "mpi4py is required for parallel calculations. Please install mpi4py." ) else: - command_lst += [get_command_path(executable="serial_cache.py"), file_name] + command_lst += [get_command_path(executable="cache_serial.py"), file_name] return command_lst diff --git a/pympipool/scheduler/__init__.py b/pympipool/interactive/__init__.py similarity index 94% rename from pympipool/scheduler/__init__.py rename to pympipool/interactive/__init__.py index d8dbffe7..a5d95e55 100644 --- a/pympipool/scheduler/__init__.py +++ b/pympipool/interactive/__init__.py @@ -1,11 +1,11 @@ import os import shutil from typing import Optional -from pympipool.scheduler.universal import ( - UniversalExecutor, - UniversalStepExecutor, +from pympipool.interactive.executor import ( + InteractiveExecutor, + InteractiveStepExecutor, ) -from pympipool.scheduler.interface import ( +from pympipool.shared.interface import ( MpiExecInterface, SLURM_COMMAND, SrunInterface, @@ -23,7 +23,7 @@ ) try: # The PyFluxExecutor requires flux-core to be installed. - from pympipool.scheduler.flux import FluxPythonInterface + from pympipool.interactive.flux import FluxPythonInterface flux_installed = "FLUX_URI" in os.environ except ImportError: @@ -108,13 +108,13 @@ def create_executor( executor_kwargs["pmi"] = pmi if block_allocation: executor_kwargs["init_function"] = init_function - return UniversalExecutor( + return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, interface_class=FluxPythonInterface, ) else: - return UniversalStepExecutor( + return InteractiveStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, interface_class=FluxPythonInterface, @@ -127,13 +127,13 @@ def create_executor( executor_kwargs["oversubscribe"] = oversubscribe if block_allocation: executor_kwargs["init_function"] = init_function - return UniversalExecutor( + return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, interface_class=SrunInterface, ) else: - return UniversalStepExecutor( + return InteractiveStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, interface_class=SrunInterface, @@ -148,13 +148,13 @@ def create_executor( executor_kwargs["oversubscribe"] = oversubscribe if block_allocation: executor_kwargs["init_function"] = init_function - return UniversalExecutor( + return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, interface_class=MpiExecInterface, ) else: - return UniversalStepExecutor( + return InteractiveStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, interface_class=MpiExecInterface, diff --git a/pympipool/shared/backend.py b/pympipool/interactive/backend.py similarity index 100% rename from pympipool/shared/backend.py rename to pympipool/interactive/backend.py diff --git a/pympipool/shared/dependencies.py b/pympipool/interactive/dependencies.py similarity index 83% rename from pympipool/shared/dependencies.py rename to pympipool/interactive/dependencies.py index d561902a..44220591 100644 --- a/pympipool/shared/dependencies.py +++ b/pympipool/interactive/dependencies.py @@ -1,5 +1,5 @@ -from pympipool.scheduler import create_executor -from pympipool.shared.executorbase import ExecutorSteps, execute_tasks_with_dependencies +from pympipool.interactive import create_executor +from pympipool.shared.executor import ExecutorSteps, execute_tasks_with_dependencies from pympipool.shared.thread import RaisingThread diff --git a/pympipool/scheduler/universal.py b/pympipool/interactive/executor.py similarity index 62% rename from pympipool/scheduler/universal.py rename to pympipool/interactive/executor.py index b6dd8531..c8b8875c 100644 --- a/pympipool/scheduler/universal.py +++ b/pympipool/interactive/executor.py @@ -1,21 +1,21 @@ -from pympipool.shared.executorbase import ( +from pympipool.shared.executor import ( execute_parallel_tasks, execute_separate_tasks, ExecutorBroker, ExecutorSteps, ) -from pympipool.scheduler.interface import BaseInterface +from pympipool.shared.interface import BaseInterface from pympipool.shared.thread import RaisingThread -from pympipool.scheduler.interface import MpiExecInterface +from pympipool.shared.interface import MpiExecInterface -class UniversalExecutor(ExecutorBroker): +class InteractiveExecutor(ExecutorBroker): """ - The pympipool.scheduler.universal.UniversalExecutor leverages the pympipool interfaces to distribute python tasks on - a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the - pympipool.scheduler.universal.UniversalExecutor can be executed in a serial python process and does not require the - python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to improve - the usability in particular when used in combination with Jupyter notebooks. + The pympipool.interactive.executor.InteractiveExecutor leverages the pympipool interfaces to distribute python tasks + on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the + pympipool.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require + the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to + improves the usability in particular when used in combination with Jupyter notebooks. Args: max_workers (int): defines the number workers which can execute functions in parallel @@ -25,7 +25,7 @@ class UniversalExecutor(ExecutorBroker): Examples: >>> import numpy as np - >>> from pympipool.scheduler.flux import PyFluxExecutor + >>> from pympipool.interactive.executor import InteractiveExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI @@ -36,7 +36,7 @@ class UniversalExecutor(ExecutorBroker): >>> def init_k(): >>> return {"k": 3} >>> - >>> with PyFluxExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: + >>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: >>> fs = p.submit(calc, 2, j=4) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] @@ -63,21 +63,23 @@ def __init__( ) -class UniversalStepExecutor(ExecutorSteps): +class InteractiveStepExecutor(ExecutorSteps): """ - The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing - system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number - of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per - worker. + The pympipool.interactive.executor.InteractiveStepExecutor leverages the pympipool interfaces to distribute python + tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.interactive.executor.InteractiveStepExecutor + can be executed in a serial python process and does not require the python script to be executed with MPI. + Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used + in combination with Jupyter notebooks. Args: max_cores (int): defines the number workers which can execute functions in parallel executor_kwargs (dict): keyword arguments for the executor + interface_class (BaseInterface): interface class to initiate python processes Examples: >>> import numpy as np - >>> from pympipool.scheduler.flux import PyFluxStepExecutor + >>> from pympipool.interactive.executor import InteractiveStepExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI diff --git a/pympipool/scheduler/flux.py b/pympipool/interactive/flux.py similarity index 97% rename from pympipool/scheduler/flux.py rename to pympipool/interactive/flux.py index c1e36754..4a40df3d 100644 --- a/pympipool/scheduler/flux.py +++ b/pympipool/interactive/flux.py @@ -3,7 +3,7 @@ import flux.job -from pympipool.scheduler.interface import BaseInterface +from pympipool.shared.interface import BaseInterface class FluxPythonInterface(BaseInterface): diff --git a/pympipool/shared/__init__.py b/pympipool/shared/__init__.py index 73723ea0..67f736bd 100644 --- a/pympipool/shared/__init__.py +++ b/pympipool/shared/__init__.py @@ -6,9 +6,9 @@ interface_shutdown, interface_receive, ) -from pympipool.shared.executorbase import cancel_items_in_queue +from pympipool.shared.executor import cancel_items_in_queue from pympipool.shared.thread import RaisingThread -from pympipool.scheduler.interface import MpiExecInterface, SrunInterface +from pympipool.shared.interface import MpiExecInterface, SrunInterface __all__ = [ diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executor.py similarity index 98% rename from pympipool/shared/executorbase.py rename to pympipool/shared/executor.py index d0b9ea28..bc8f5b8a 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executor.py @@ -14,7 +14,7 @@ from pympipool.shared.communication import interface_bootup from pympipool.shared.thread import RaisingThread -from pympipool.scheduler.interface import BaseInterface, MpiExecInterface +from pympipool.shared.interface import BaseInterface, MpiExecInterface from pympipool.shared.inputcheck import ( check_resource_dict, check_resource_dict_is_empty, @@ -432,20 +432,20 @@ def _get_backend_path(cores: int) -> list: Get command to call backend as a list of two strings Args: - cores (int): Number of cores used to execute the task, if it is greater than one use mpiexec_interactive.py else serial_interactive.py + cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py Returns: list[str]: List of strings containing the python executable path and the backend script to execute """ command_lst = [sys.executable] if cores > 1 and importlib.util.find_spec("mpi4py") is not None: - command_lst += [get_command_path(executable="mpiexec_interactive.py")] + command_lst += [get_command_path(executable="interactive_parallel.py")] elif cores > 1: raise ImportError( "mpi4py is required for parallel calculations. Please install mpi4py." ) else: - command_lst += [get_command_path(executable="serial_interactive.py")] + command_lst += [get_command_path(executable="interactive_serial.py")] return command_lst @@ -454,7 +454,7 @@ def _get_command_path(executable: str) -> str: Get path of the backend executable script Args: - executable (str): Name of the backend executable script, either mpiexec_interactive.py or serial_interactive.py + executable (str): Name of the backend executable script, either interactive_parallel.py or interactive_serial.py Returns: str: absolute path to the executable script diff --git a/pympipool/scheduler/interface.py b/pympipool/shared/interface.py similarity index 100% rename from pympipool/scheduler/interface.py rename to pympipool/shared/interface.py diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index 92e4803a..48636427 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -2,7 +2,7 @@ from concurrent.futures import Future import subprocess -from pympipool.shared.executorbase import ExecutorBroker +from pympipool.shared.executor import ExecutorBroker from pympipool.shared.thread import RaisingThread diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index 1a3dc6c4..a03b81af 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -5,7 +5,7 @@ import subprocess from time import sleep -from pympipool.shared.executorbase import cancel_items_in_queue, ExecutorBase +from pympipool.shared.executor import cancel_items_in_queue, ExecutorBase from pympipool.shared.thread import RaisingThread diff --git a/tests/test_backend_serial.py b/tests/test_backend_serial.py index 3684ed7f..16e9d89a 100644 --- a/tests/test_backend_serial.py +++ b/tests/test_backend_serial.py @@ -4,7 +4,7 @@ import cloudpickle import zmq -from pympipool.backend.serial_interactive import main +from pympipool.backend.interactive_serial import main def calc(i, j): diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 0d277fb6..cbfd14ff 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -2,7 +2,7 @@ from time import sleep from pympipool import Executor -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.shared.executor import cloudpickle_register def add_function(parameter_1, parameter_2): diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index ec5fe768..5c898900 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -8,7 +8,7 @@ try: import flux.job - from pympipool.scheduler.flux import FluxPythonInterface + from pympipool.interactive.flux import FluxPythonInterface skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index a7c7d335..13d40025 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -2,7 +2,7 @@ import unittest from pympipool import Executor -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.shared.executor import cloudpickle_register skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index 7fcd6af2..1ba18ef9 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -1,7 +1,7 @@ import unittest from pympipool import Executor -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.shared.executor import cloudpickle_register def calc(i): diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 292f4a83..a7eade1e 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -5,13 +5,13 @@ import numpy as np -from pympipool.scheduler.universal import UniversalExecutor -from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks +from pympipool.interactive.executor import InteractiveExecutor +from pympipool.shared.executor import cloudpickle_register, execute_parallel_tasks try: import flux.job - from pympipool.scheduler.flux import FluxPythonInterface + from pympipool.interactive.flux import FluxPythonInterface skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -47,7 +47,7 @@ def setUp(self): self.executor = flux.job.FluxExecutor() def test_flux_executor_serial(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=2, executor_kwargs={"executor": self.executor}, interface_class=FluxPythonInterface, @@ -60,7 +60,7 @@ def test_flux_executor_serial(self): self.assertTrue(fs_2.done()) def test_flux_executor_threads(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "threads_per_core": 2}, interface_class=FluxPythonInterface, @@ -73,7 +73,7 @@ def test_flux_executor_threads(self): self.assertTrue(fs_2.done()) def test_flux_executor_parallel(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi}, interface_class=FluxPythonInterface, @@ -83,7 +83,7 @@ def test_flux_executor_parallel(self): self.assertTrue(fs_1.done()) def test_single_task(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi}, interface_class=FluxPythonInterface, @@ -126,7 +126,7 @@ def test_execute_task_threads(self): q.join() def test_internal_memory(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={ "executor": self.executor, diff --git a/tests/test_integration_pyiron_workflow.py b/tests/test_integration_pyiron_workflow.py index e9d101f2..16133f29 100644 --- a/tests/test_integration_pyiron_workflow.py +++ b/tests/test_integration_pyiron_workflow.py @@ -12,7 +12,7 @@ import unittest from pympipool import Executor -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.shared.executor import cloudpickle_register class Foo: diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 84496738..0f9813fa 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -6,13 +6,13 @@ import numpy as np -from pympipool.scheduler.interface import MpiExecInterface -from pympipool.scheduler.universal import ( - UniversalExecutor, - UniversalStepExecutor, +from pympipool.shared.interface import MpiExecInterface +from pympipool.interactive.executor import ( + InteractiveExecutor, + InteractiveStepExecutor, ) -from pympipool.shared.backend import call_funct -from pympipool.shared.executorbase import ( +from pympipool.interactive.backend import call_funct +from pympipool.shared.executor import ( cloudpickle_register, execute_parallel_tasks, ExecutorBase, @@ -61,7 +61,7 @@ def sleep_one(i): class TestPyMpiExecutorSerial(unittest.TestCase): def test_pympiexecutor_two_workers(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=2, executor_kwargs={"hostname_localhost": True}, interface_class=MpiExecInterface, @@ -75,7 +75,7 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_2.done()) def test_pympiexecutor_one_worker(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"hostname_localhost": True}, interface_class=MpiExecInterface, @@ -91,7 +91,7 @@ def test_pympiexecutor_one_worker(self): class TestPyMpiExecutorStepSerial(unittest.TestCase): def test_pympiexecutor_two_workers(self): - with UniversalStepExecutor( + with InteractiveStepExecutor( max_cores=2, executor_kwargs={"hostname_localhost": True}, interface_class=MpiExecInterface, @@ -105,7 +105,7 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_2.done()) def test_pympiexecutor_one_worker(self): - with UniversalStepExecutor( + with InteractiveStepExecutor( max_cores=1, executor_kwargs={"hostname_localhost": True}, interface_class=MpiExecInterface, @@ -124,7 +124,7 @@ def test_pympiexecutor_one_worker(self): ) class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -135,7 +135,7 @@ def test_pympiexecutor_one_worker_with_mpi(self): self.assertTrue(fs_1.done()) def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -155,7 +155,7 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): ) def test_pympiexecutor_one_worker_with_mpi_echo(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -170,7 +170,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): ) class TestPyMpiStepExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): - with UniversalStepExecutor( + with InteractiveStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -181,7 +181,7 @@ def test_pympiexecutor_one_worker_with_mpi(self): self.assertTrue(fs_1.done()) def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): - with UniversalStepExecutor( + with InteractiveStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -201,7 +201,7 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): ) def test_pympiexecutor_one_worker_with_mpi_echo(self): - with UniversalStepExecutor( + with InteractiveStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -213,7 +213,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={ "cores": 1, @@ -256,7 +256,7 @@ def test_execute_task(self): class TestFuturePool(unittest.TestCase): def test_pool_serial(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -271,7 +271,7 @@ def test_pool_serial(self): self.assertEqual(output.result(), np.array(4)) def test_executor_multi_submission(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -284,7 +284,7 @@ def test_executor_multi_submission(self): self.assertTrue(fs_2.done()) def test_shutdown(self): - p = UniversalExecutor( + p = InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -300,7 +300,7 @@ def test_shutdown(self): fs2.result() def test_pool_serial_map(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -310,7 +310,7 @@ def test_pool_serial_map(self): def test_executor_exception(self): with self.assertRaises(RuntimeError): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -319,7 +319,7 @@ def test_executor_exception(self): def test_executor_exception_future(self): with self.assertRaises(RuntimeError): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -333,14 +333,14 @@ def test_executor_exception_future(self): def test_meta(self): meta_data_exe_dict = { "cores": 2, - "interface_class": "", + "interface_class": "", "hostname_localhost": True, "init_function": None, "cwd": None, "oversubscribe": False, "max_workers": 1, } - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={ "cores": 2, @@ -362,13 +362,13 @@ def test_meta(self): def test_meta_step(self): meta_data_exe_dict = { "cores": 2, - "interface_class": "", + "interface_class": "", "hostname_localhost": True, "cwd": None, "oversubscribe": False, "max_cores": 2, } - with UniversalStepExecutor( + with InteractiveStepExecutor( max_cores=2, executor_kwargs={ "cores": 2, @@ -388,7 +388,7 @@ def test_meta_step(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_multi_core(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, interface_class=MpiExecInterface, @@ -406,7 +406,7 @@ def test_pool_multi_core(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_multi_core_map(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, interface_class=MpiExecInterface, diff --git a/tests/test_local_executor_future.py b/tests/test_local_executor_future.py index a3e19e54..5d01f09a 100644 --- a/tests/test_local_executor_future.py +++ b/tests/test_local_executor_future.py @@ -5,8 +5,8 @@ import numpy as np -from pympipool.scheduler.universal import UniversalExecutor -from pympipool.scheduler.interface import MpiExecInterface +from pympipool.interactive.executor import InteractiveExecutor +from pympipool.shared.interface import MpiExecInterface skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -18,7 +18,7 @@ def calc(i): class TestFuture(unittest.TestCase): def test_pool_serial(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"hostname_localhost": True, "cores": 1}, interface_class=MpiExecInterface, @@ -34,7 +34,7 @@ def test_pool_serial(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_serial_multi_core(self): - with UniversalExecutor( + with InteractiveExecutor( max_workers=1, executor_kwargs={"hostname_localhost": True, "cores": 2}, interface_class=MpiExecInterface, @@ -67,7 +67,7 @@ def callback(future): def submit(): # Executor only exists in this scope and can get garbage collected after # this function is exits - future = UniversalExecutor( + future = InteractiveExecutor( executor_kwargs={"hostname_localhost": True}, interface_class=MpiExecInterface, ).submit(slow_callable) @@ -107,7 +107,7 @@ def __init__(self): def run(self): self.running = True - future = UniversalExecutor( + future = InteractiveExecutor( executor_kwargs={"hostname_localhost": True}, interface_class=MpiExecInterface, ).submit(self.return_42) diff --git a/tests/test_shared_backend.py b/tests/test_shared_backend.py index f9c1d0a2..8caca6c7 100644 --- a/tests/test_shared_backend.py +++ b/tests/test_shared_backend.py @@ -2,8 +2,8 @@ import sys import unittest -from pympipool.shared.backend import parse_arguments -from pympipool.scheduler.interface import SrunInterface, MpiExecInterface +from pympipool.interactive.backend import parse_arguments +from pympipool.shared.interface import SrunInterface, MpiExecInterface class TestParser(unittest.TestCase): diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 6eca60d7..d8732267 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -13,8 +13,8 @@ interface_receive, SocketInterface, ) -from pympipool.shared.executorbase import cloudpickle_register -from pympipool.scheduler.interface import MpiExecInterface +from pympipool.shared.executor import cloudpickle_register +from pympipool.shared.interface import MpiExecInterface skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -44,7 +44,7 @@ def test_interface_mpi(self): "..", "pympipool", "backend", - "mpiexec_interactive.py", + "interactive_parallel.py", ) ), "--zmqport", @@ -72,7 +72,7 @@ def test_interface_serial(self): "..", "pympipool", "backend", - "serial_interactive.py", + "interactive_serial.py", ) ), "--zmqport", diff --git a/tests/test_shared_executorbase.py b/tests/test_shared_executorbase.py index 348fe59e..7832ec44 100644 --- a/tests/test_shared_executorbase.py +++ b/tests/test_shared_executorbase.py @@ -2,7 +2,7 @@ from queue import Queue import unittest -from pympipool.shared.executorbase import cancel_items_in_queue +from pympipool.shared.executor import cancel_items_in_queue class TestQueue(unittest.TestCase):