diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 8cfaa2c0..5a31f1e6 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -15,7 +15,6 @@ check_nested_flux_executor, check_oversubscribe, check_pmi, - check_threads_per_core, validate_number_of_cores, ) from executorlib.standalone.interactive.spawner import ( @@ -258,7 +257,6 @@ def create_executor( elif backend == "local": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) - check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) check_command_line_argument_lst( command_line_argument_lst=resource_dict["slurm_cmd_args"] diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 975000e7..48d6f494 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -546,13 +546,14 @@ def _submit_function_to_separate_process( resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1 ): resource_dict["cores"] = executor_kwargs["cores"] + slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1) active_task_dict = _wait_for_free_slots( active_task_dict=active_task_dict, - cores_requested=resource_dict["cores"], + cores_requested=slots_required, max_cores=max_cores, max_workers=max_workers, ) - active_task_dict[task_dict["future"]] = resource_dict["cores"] + active_task_dict[task_dict["future"]] = slots_required task_kwargs = executor_kwargs.copy() task_kwargs.update(resource_dict) task_kwargs.update( diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 5abea87b..84898ee5 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -39,19 +39,6 @@ def check_gpus_per_worker(gpus_per_worker: int) -> None: ) -def check_threads_per_core(threads_per_core: int) -> None: - """ - Check if threads_per_core is not 1 and raise a TypeError if it is. - """ - if threads_per_core != 1: - raise TypeError( - "Thread based parallelism is not supported for the executorlib.mpi.PyMPIExecutor backend." - "Please use threads_per_core=1 instead of threads_per_core=" - + str(threads_per_core) - + "." - ) - - def check_executor(executor: Executor) -> None: """ Check if executor is not None and raise a ValueError if it is. diff --git a/executorlib/standalone/interactive/spawner.py b/executorlib/standalone/interactive/spawner.py index d9456724..cbc3ce81 100644 --- a/executorlib/standalone/interactive/spawner.py +++ b/executorlib/standalone/interactive/spawner.py @@ -57,6 +57,7 @@ def __init__( cwd: Optional[str] = None, cores: int = 1, openmpi_oversubscribe: bool = False, + threads_per_core: int = 1, ): """ Subprocess interface implementation. @@ -64,6 +65,7 @@ def __init__( Args: cwd (str, optional): The current working directory. Defaults to None. cores (int, optional): The number of cores to use. Defaults to 1. + threads_per_core (int, optional): The number of threads per core. Defaults to 1. oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False. """ super().__init__( @@ -72,6 +74,7 @@ def __init__( openmpi_oversubscribe=openmpi_oversubscribe, ) self._process = None + self._threads_per_core = threads_per_core def bootup( self, @@ -169,8 +172,8 @@ def __init__( cwd=cwd, cores=cores, openmpi_oversubscribe=openmpi_oversubscribe, + threads_per_core=threads_per_core, ) - self._threads_per_core = threads_per_core self._gpus_per_core = gpus_per_core self._slurm_cmd_args = slurm_cmd_args diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 4876cbfc..618dad74 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -74,12 +74,6 @@ def test_meta_executor_parallel(self): self.assertTrue(fs_1.done()) def test_errors(self): - with self.assertRaises(TypeError): - Executor( - max_cores=1, - resource_dict={"cores": 1, "threads_per_core": 2}, - backend="local", - ) with self.assertRaises(TypeError): Executor( max_cores=1, diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index 1440d5e9..47ea2bb3 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -58,15 +58,6 @@ def test_meta_executor_single(self): self.assertTrue(fs_2.done()) def test_errors(self): - with self.assertRaises(TypeError): - Executor( - max_cores=1, - resource_dict={ - "cores": 1, - "threads_per_core": 2, - }, - backend="local", - ) with self.assertRaises(TypeError): Executor( max_cores=1, diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 7ba84f71..5aa70bdc 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -3,7 +3,6 @@ from executorlib.standalone.inputcheck import ( check_command_line_argument_lst, check_gpus_per_worker, - check_threads_per_core, check_oversubscribe, check_executor, check_init_function, @@ -31,10 +30,6 @@ def test_check_gpus_per_worker(self): with self.assertRaises(TypeError): check_gpus_per_worker(gpus_per_worker=1) - def test_check_threads_per_core(self): - with self.assertRaises(TypeError): - check_threads_per_core(threads_per_core=2) - def test_check_oversubscribe(self): with self.assertRaises(ValueError): check_oversubscribe(oversubscribe=True)