Skip to content

Commit

Permalink
Use standard thread library (#580)
Browse files Browse the repository at this point in the history
* Use standard thread library

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update shared.py

* Update executor.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* clean up

* fixes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* ignore type warnings

* more test fixes

* another fix

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* more fixes

* try to fix old version

* Fix python 3.10 issue

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Feb 14, 2025
1 parent c037c78 commit ff49de3
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 103 deletions.
12 changes: 6 additions & 6 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from concurrent.futures import (
Future,
)
from threading import Thread
from typing import Callable, Optional, Union

from executorlib.standalone.inputcheck import check_resource_dict
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.standalone.thread import RaisingThread


class ExecutorBase(FutureExecutor):
Expand All @@ -29,7 +29,7 @@ def __init__(self, max_cores: Optional[int] = None):
cloudpickle_register(ind=3)
self._max_cores = max_cores
self._future_queue: Optional[queue.Queue] = queue.Queue()
self._process: Optional[Union[RaisingThread, list[RaisingThread]]] = None
self._process: Optional[Union[Thread, list[Thread]]] = None

@property
def info(self) -> Optional[dict]:
Expand All @@ -40,13 +40,13 @@ def info(self) -> Optional[dict]:
Optional[dict]: Information about the executor.
"""
if self._process is not None and isinstance(self._process, list):
meta_data_dict = self._process[0].get_kwargs().copy()
meta_data_dict = self._process[0]._kwargs.copy() # type: ignore
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
meta_data_dict["max_workers"] = len(self._process)
return meta_data_dict
elif self._process is not None:
meta_data_dict = self._process.get_kwargs().copy()
meta_data_dict = self._process._kwargs.copy() # type: ignore
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
return meta_data_dict
Expand Down Expand Up @@ -138,13 +138,13 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
cancel_items_in_queue(que=self._future_queue)
if self._process is not None and self._future_queue is not None:
self._future_queue.put({"shutdown": True, "wait": wait})
if wait and isinstance(self._process, RaisingThread):
if wait and isinstance(self._process, Thread):
self._process.join()
self._future_queue.join()
self._process = None
self._future_queue = None

def _set_process(self, process: RaisingThread):
def _set_process(self, process: Thread):
"""
Set the process for the executor.
Expand Down
4 changes: 2 additions & 2 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from threading import Thread
from typing import Callable, Optional

from executorlib.base.executor import ExecutorBase
Expand All @@ -15,7 +16,6 @@
check_max_workers_and_cores,
check_nested_flux_executor,
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.cache.queue_spawner import execute_with_pysqa
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(
cache_directory_path = os.path.abspath(cache_directory)
os.makedirs(cache_directory_path, exist_ok=True)
self._set_process(
RaisingThread(
Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": self._future_queue,
Expand Down
6 changes: 4 additions & 2 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ def execute_tasks_h5(
]
else:
if len(future_wait_key_lst) > 0:
raise ValueError(
"Future objects are not supported as input if disable_dependencies=True."
task_dict["future"].set_exception(
ValueError(
"Future objects are not supported as input if disable_dependencies=True."
)
)
task_dependent_lst = []
process_dict[task_key] = execute_function(
Expand Down
4 changes: 2 additions & 2 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from concurrent.futures import Future
from threading import Thread
from typing import Any, Callable, Optional

from executorlib.base.executor import ExecutorBase
Expand All @@ -8,7 +9,6 @@
generate_nodes_and_edges,
generate_task_hash,
)
from executorlib.standalone.thread import RaisingThread


class ExecutorWithDependencies(ExecutorBase):
Expand Down Expand Up @@ -41,7 +41,7 @@ def __init__(
) -> None:
super().__init__(max_cores=max_cores)
self._set_process(
RaisingThread(
Thread(
target=execute_tasks_with_dependencies,
kwargs={
# Executor Arguments
Expand Down
36 changes: 18 additions & 18 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from asyncio.exceptions import CancelledError
from concurrent.futures import Future, TimeoutError
from threading import Thread
from time import sleep
from typing import Any, Callable, Optional, Union

Expand All @@ -20,7 +21,6 @@
)
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.standalone.serialize import serialize_funct_h5
from executorlib.standalone.thread import RaisingThread


class ExecutorBroker(ExecutorBase):
Expand Down Expand Up @@ -89,7 +89,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
self._process = None
self._future_queue = None

def _set_process(self, process: list[RaisingThread]): # type: ignore
def _set_process(self, process: list[Thread]): # type: ignore
"""
Set the process for the executor.
Expand Down Expand Up @@ -149,7 +149,7 @@ def __init__(
executor_kwargs["queue_join_on_shutdown"] = False
self._set_process(
process=[
RaisingThread(
Thread(
target=execute_parallel_tasks,
kwargs=executor_kwargs,
)
Expand Down Expand Up @@ -205,7 +205,7 @@ def __init__(
executor_kwargs["max_cores"] = max_cores
executor_kwargs["max_workers"] = max_workers
self._set_process(
RaisingThread(
Thread(
target=execute_separate_tasks,
kwargs=executor_kwargs,
)
Expand Down Expand Up @@ -363,17 +363,18 @@ def execute_tasks_with_dependencies(
):
future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
exception_lst = _get_exception_lst(future_lst=future_lst)
if len(exception_lst) > 0:
task_dict["future"].set_exception(exception_lst[0])
elif len(future_lst) == 0 or ready_flag:
# No future objects are used in the input or all future objects are already done
task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
)
executor_queue.put(task_dict)
else: # Otherwise add the function to the wait list
task_dict["future_lst"] = future_lst
wait_lst.append(task_dict)
if not _get_exception(future_obj=task_dict["future"]):
if len(exception_lst) > 0:
task_dict["future"].set_exception(exception_lst[0])
elif len(future_lst) == 0 or ready_flag:
# No future objects are used in the input or all future objects are already done
task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
)
executor_queue.put(task_dict)
else: # Otherwise add the function to the wait list
task_dict["future_lst"] = future_lst
wait_lst.append(task_dict)
future_queue.task_done()
elif len(wait_lst) > 0:
number_waiting = len(wait_lst)
Expand Down Expand Up @@ -589,7 +590,7 @@ def _submit_function_to_separate_process(
"init_function": None,
}
)
process = RaisingThread(
process = Thread(
target=execute_parallel_tasks,
kwargs=task_kwargs,
)
Expand All @@ -610,14 +611,13 @@ def _execute_task(
future_queue (Queue): Queue for receiving new tasks.
"""
f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
if not f.done() and f.set_running_or_notify_cancel():
try:
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
future_queue.task_done()
f.set_exception(exception=thread_exception)
raise thread_exception
else:
future_queue.task_done()

Expand Down
2 changes: 0 additions & 2 deletions executorlib/standalone/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
interface_shutdown,
)
from executorlib.standalone.interactive.spawner import MpiExecSpawner
from executorlib.standalone.thread import RaisingThread

__all__ = [
"SocketInterface",
Expand All @@ -16,6 +15,5 @@
"interface_send",
"interface_shutdown",
"interface_receive",
"RaisingThread",
"MpiExecSpawner",
]
42 changes: 0 additions & 42 deletions executorlib/standalone/thread.py

This file was deleted.

11 changes: 6 additions & 5 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from queue import Queue
import shutil
import unittest
from threading import Thread

from executorlib.cache.subprocess_spawner import (
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.cache.executor import FileExecutor, create_file_executor
Expand Down Expand Up @@ -57,7 +57,8 @@ def test_executor_dependence_error(self):
with FileExecutor(
execute_function=execute_in_subprocess, disable_dependencies=True
) as exe:
exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2))
fs = exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2))
fs.result()

def test_executor_working_directory(self):
cwd = os.path.join(os.path.dirname(__file__), "executables")
Expand All @@ -81,7 +82,7 @@ def test_executor_function(self):
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
process = Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": q,
Expand Down Expand Up @@ -122,7 +123,7 @@ def test_executor_function_dependence_kwargs(self):
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
process = Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": q,
Expand Down Expand Up @@ -163,7 +164,7 @@ def test_executor_function_dependence_args(self):
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
process = Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": q,
Expand Down
Loading

0 comments on commit ff49de3

Please sign in to comment.