Skip to content

Commit

Permalink
[feature] resize block allocation executor (#589)
Browse files Browse the repository at this point in the history
* [feature] resize block allocation executor

* extend test

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixes

* fix type check

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixes

* test property

* test with dependencies

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixes and increased test coverage

* fix base property

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Feb 15, 2025
1 parent 3f3f212 commit 8860291
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 9 deletions.
8 changes: 8 additions & 0 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ def __init__(self, max_cores: Optional[int] = None):
self._future_queue: Optional[queue.Queue] = queue.Queue()
self._process: Optional[Union[Thread, list[Thread]]] = None

@property
def max_workers(self) -> Optional[int]:
return self._process_kwargs.get("max_workers")

@max_workers.setter
def max_workers(self, max_workers: int):
raise NotImplementedError("The max_workers setter is not implemented.")

@property
def info(self) -> Optional[dict]:
"""
Expand Down
35 changes: 33 additions & 2 deletions executorlib/interactive/blockallocation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import queue
from concurrent.futures import Future
from threading import Thread
from typing import Callable, Optional
Expand Down Expand Up @@ -27,7 +28,7 @@ class BlockAllocationExecutor(ExecutorBase):
Examples:
>>> import numpy as np
>>> from executorlib.interactive.shared import BlockAllocationExecutor
>>> from executorlib.interactive.blockallocation import BlockAllocationExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand Down Expand Up @@ -58,16 +59,46 @@ def __init__(
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._max_workers = max_workers
self._set_process(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
for _ in range(self._max_workers)
],
)

@property
def max_workers(self) -> int:
return self._max_workers

@max_workers.setter
def max_workers(self, max_workers: int):
if isinstance(self._future_queue, queue.Queue) and isinstance(
self._process, list
):
if self._max_workers > max_workers:
for _ in range(self._max_workers - max_workers):
self._future_queue.queue.insert(0, {"shutdown": True, "wait": True})
while len(self._process) > max_workers:
self._process = [
process for process in self._process if process.is_alive()
]
elif self._max_workers < max_workers:
new_process_lst = [
Thread(
target=execute_tasks,
kwargs=self._process_kwargs,
)
for _ in range(max_workers - self._max_workers)
]
for process_instance in new_process_lst:
process_instance.start()
self._process += new_process_lst
self._max_workers = max_workers

def submit( # type: ignore
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
) -> Future:
Expand Down
40 changes: 38 additions & 2 deletions executorlib/interactive/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,39 @@ def info(self) -> Optional[dict]:
if isinstance(self._future_queue, queue.Queue):
f: Future = Future()
self._future_queue.queue.insert(
0, {"internal": True, "task": "info", "future": f}
0, {"internal": True, "task": "get_info", "future": f}
)
return f.result()
else:
return None

@property
def max_workers(self) -> Optional[int]:
if isinstance(self._future_queue, queue.Queue):
f: Future = Future()
self._future_queue.queue.insert(
0, {"internal": True, "task": "get_max_workers", "future": f}
)
return f.result()
else:
return None

@max_workers.setter
def max_workers(self, max_workers: int):
if isinstance(self._future_queue, queue.Queue):
f: Future = Future()
self._future_queue.queue.insert(
0,
{
"internal": True,
"task": "set_max_workers",
"max_workers": max_workers,
"future": f,
},
)
if not f.result():
raise NotImplementedError("The max_workers setter is not implemented.")

def submit( # type: ignore
self,
fn: Callable[..., Any],
Expand Down Expand Up @@ -188,8 +215,17 @@ def _execute_tasks_with_dependencies(
if ( # shutdown the executor
task_dict is not None and "internal" in task_dict and task_dict["internal"]
):
if task_dict["task"] == "info":
if task_dict["task"] == "get_info":
task_dict["future"].set_result(executor.info)
elif task_dict["task"] == "get_max_workers":
task_dict["future"].set_result(executor.max_workers)
elif task_dict["task"] == "set_max_workers":
try:
executor.max_workers = task_dict["max_workers"]
except NotImplementedError:
task_dict["future"].set_result(False)
else:
task_dict["future"].set_result(True)
elif ( # handle function submitted to the executor
task_dict is not None and "fn" in task_dict and "future" in task_dict
):
Expand Down
16 changes: 11 additions & 5 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import importlib.util
import os
import queue
Expand Down Expand Up @@ -57,7 +58,7 @@ def execute_tasks(
task_dict = future_queue.get()
if "shutdown" in task_dict and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
future_queue.task_done()
_task_done(future_queue=future_queue)
if queue_join_on_shutdown:
future_queue.join()
break
Expand Down Expand Up @@ -117,10 +118,10 @@ def _execute_task_without_cache(
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
future_queue.task_done()
_task_done(future_queue=future_queue)
f.set_exception(exception=thread_exception)
else:
future_queue.task_done()
_task_done(future_queue=future_queue)


def _execute_task_with_cache(
Expand Down Expand Up @@ -161,13 +162,18 @@ def _execute_task_with_cache(
f.set_result(result)
except Exception as thread_exception:
interface.shutdown(wait=True)
future_queue.task_done()
_task_done(future_queue=future_queue)
f.set_exception(exception=thread_exception)
raise thread_exception
else:
future_queue.task_done()
_task_done(future_queue=future_queue)
else:
_, result = get_output(file_name=file_name)
future = task_dict["future"]
future.set_result(result)
_task_done(future_queue=future_queue)


def _task_done(future_queue: queue.Queue):
with contextlib.suppress(ValueError):
future_queue.task_done()
16 changes: 16 additions & 0 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ def test_pympiexecutor_two_workers(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_max_workers(self):
with BlockAllocationExecutor(
max_workers=2,
executor_kwargs={},
spawner=MpiExecSpawner,
) as exe:
self.assertEqual(exe.max_workers, 2)

def test_pympiexecutor_one_worker(self):
with BlockAllocationExecutor(
max_workers=1,
Expand Down Expand Up @@ -107,6 +115,14 @@ def test_pympiexecutor_two_workers(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_max_workers(self):
with OneTaskPerProcessExecutor(
max_workers=2,
executor_kwargs={},
spawner=MpiExecSpawner,
) as exe:
self.assertEqual(exe.max_workers, 2)

def test_pympiexecutor_one_worker(self):
with OneTaskPerProcessExecutor(
max_cores=1,
Expand Down
80 changes: 80 additions & 0 deletions tests/test_local_executor_resize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import unittest
from executorlib import SingleNodeExecutor
from executorlib.standalone.serialize import cloudpickle_register


def sleep_funct(sec):
from time import sleep
sleep(sec)
return sec


class TestResizing(unittest.TestCase):
def test_without_dependencies_decrease(self):
cloudpickle_register(ind=1)
with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=True) as exe:
future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)]
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
self.assertEqual(len(exe), 4)
sleep_funct(sec=0.5)
exe.max_workers = 1
self.assertTrue(len(exe) >= 1)
self.assertEqual(len(exe._process), 1)
self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3)
self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1])
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])

def test_without_dependencies_increase(self):
cloudpickle_register(ind=1)
with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=True) as exe:
future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)]
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
self.assertEqual(len(exe), 4)
self.assertEqual(exe.max_workers, 1)
future_lst[0].result()
exe.max_workers = 2
self.assertEqual(exe.max_workers, 2)
self.assertTrue(len(exe) >= 1)
self.assertEqual(len(exe._process), 2)
self.assertEqual([f.done() for f in future_lst], [True, False, False, False])
self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1])
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])

def test_with_dependencies_decrease(self):
cloudpickle_register(ind=1)
with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=False) as exe:
future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)]
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
self.assertEqual(len(exe), 4)
sleep_funct(sec=0.5)
exe.max_workers = 1
self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3)
self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1])
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])

def test_with_dependencies_increase(self):
cloudpickle_register(ind=1)
with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=False) as exe:
future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)]
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
self.assertEqual(len(exe), 4)
self.assertEqual(exe.max_workers, 1)
future_lst[0].result()
exe.max_workers = 2
self.assertEqual(exe.max_workers, 2)
self.assertEqual([f.done() for f in future_lst], [True, False, False, False])
self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1])
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])

def test_no_block_allocation(self):
with self.assertRaises(NotImplementedError):
with SingleNodeExecutor(block_allocation=False, disable_dependencies=False) as exe:
exe.max_workers = 2
with self.assertRaises(NotImplementedError):
with SingleNodeExecutor(block_allocation=False, disable_dependencies=True) as exe:
exe.max_workers = 2

def test_max_workers_stopped_executor(self):
exe = SingleNodeExecutor(block_allocation=True)
exe.shutdown(wait=True)
self.assertIsNone(exe.max_workers)

0 comments on commit 8860291

Please sign in to comment.