diff --git a/src/plumpy/__init__.py b/src/plumpy/__init__.py index 8bb5b0d..043e8e6 100644 --- a/src/plumpy/__init__.py +++ b/src/plumpy/__init__.py @@ -4,6 +4,9 @@ import logging +# interfaces +from .controller import ProcessController +from .coordinator import Coordinator from .events import * from .exceptions import * from .futures import * @@ -18,10 +21,6 @@ from .utils import * from .workchains import * -# interfaces -from .controller import ProcessController -from .coordinator import Coordinator - __all__ = ( events.__all__ + exceptions.__all__ diff --git a/src/plumpy/controller.py b/src/plumpy/controller.py index 5a411fd..eb524be 100644 --- a/src/plumpy/controller.py +++ b/src/plumpy/controller.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- from collections.abc import Sequence from typing import Any, Protocol @@ -53,7 +54,7 @@ def kill_process(self, pid: 'PID_TYPE', msg: MessageType | None = None) -> Proce ... def continue_process( - self, pid: 'PID_TYPE', tag: str|None = None, nowait: bool = False, no_reply: bool = False + self, pid: 'PID_TYPE', tag: str | None = None, nowait: bool = False, no_reply: bool = False ) -> ProcessResult | None: """ Continue the process diff --git a/src/plumpy/coordinator.py b/src/plumpy/coordinator.py index c229a6c..efec498 100644 --- a/src/plumpy/coordinator.py +++ b/src/plumpy/coordinator.py @@ -1,8 +1,6 @@ # -*- coding: utf-8 -*- -import concurrent.futures from typing import TYPE_CHECKING, Any, Callable, Hashable, Pattern, Protocol - if TYPE_CHECKING: # identifiers for subscribers ID_TYPE = Hashable @@ -15,22 +13,8 @@ BroadcastSubscriber = Callable[['Coordinator', Any, Any, Any, ID_TYPE], Any] -class Communicator(Protocol): - def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: 'ID_TYPE | None' = None) -> Any: ... - - def add_broadcast_subscriber( - self, subscriber: 'BroadcastSubscriber', subject_filter: str | Pattern[str] | None = None, identifier=None - ) -> Any: ... - - def remove_rpc_subscriber(self, identifier): ... - - def remove_broadcast_subscriber(self, identifier): ... - - def broadcast_send(self, body, sender=None, subject=None, correlation_id=None) -> bool: ... - - class Coordinator(Protocol): - def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier=None) -> Any: ... + def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: 'ID_TYPE | None' = None) -> Any: ... def add_broadcast_subscriber( self, @@ -41,9 +25,9 @@ def add_broadcast_subscriber( def add_task_subscriber(self, subscriber: 'TaskSubscriber', identifier: 'ID_TYPE | None' = None) -> 'ID_TYPE': ... - def remove_rpc_subscriber(self, identifier): ... + def remove_rpc_subscriber(self, identifier: 'ID_TYPE | None') -> None: ... - def remove_broadcast_subscriber(self, identifier): ... + def remove_broadcast_subscriber(self, identifier: 'ID_TYPE | None') -> None: ... def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None: ... @@ -52,7 +36,7 @@ def rpc_send(self, recipient_id: Hashable, msg: Any) -> Any: ... def broadcast_send( self, body: Any | None, - sender: str | None = None, + sender: Hashable | str | None = None, subject: str | None = None, correlation_id: 'ID_TYPE | None' = None, ) -> Any: ... diff --git a/src/plumpy/exceptions.py b/src/plumpy/exceptions.py index ed87f35..51898c7 100644 --- a/src/plumpy/exceptions.py +++ b/src/plumpy/exceptions.py @@ -3,12 +3,12 @@ __all__ = [ 'ClosedError', + 'CoordinatorConnectionError', + 'CoordinatorTimeoutError', 'InvalidStateError', 'KilledError', 'PersistenceError', 'UnsuccessfulResult', - 'CoordinatorConnectionError', - 'CoordinatorTimeoutError', ] diff --git a/src/plumpy/futures.py b/src/plumpy/futures.py index 01be395..d3f627f 100644 --- a/src/plumpy/futures.py +++ b/src/plumpy/futures.py @@ -18,7 +18,7 @@ class InvalidFutureError(Exception): @contextlib.contextmanager -def capture_exceptions(future: Future[Any], ignore: tuple[type[BaseException], ...] = ()) -> Generator[None, Any, None]: +def capture_exceptions(future, ignore: tuple[type[BaseException], ...] = ()) -> Generator[None, Any, None]: # type: ignore[no-untyped-def] """ Capture any exceptions in the context and set them as the result of the given future diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index c08836e..5d2c021 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -1017,7 +1017,7 @@ def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) :return: a kiwi future that resolves to the outcome of the callback """ - kiwi_future = concurrent.futures.Future() + kiwi_future = concurrent.futures.Future() # type: ignore[var-annotated] async def run_callback() -> None: with capture_exceptions(kiwi_future):