Skip to content

Commit

Permalink
Remove kiwipy/rmq dependencies of process module
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 14, 2024
1 parent a753f90 commit 156d81a
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 28 deletions.
22 changes: 10 additions & 12 deletions src/plumpy/coordinator.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
from typing import Any, Callable, Protocol
# -*- coding: utf-8 -*-
from typing import Any, Callable, Pattern, Protocol

RpcSubscriber = Callable[['Communicator', Any], Any]
BroadcastSubscriber = Callable[['Communicator', Any, Any, Any, Any], Any]

class Communicator(Protocol):

def add_rpc_subscriber(self, subscriber: RpcSubscriber, identifier=None) -> Any:
...
class Communicator(Protocol):
def add_rpc_subscriber(self, subscriber: RpcSubscriber, identifier=None) -> Any: ...

def add_broadcast_subscriber(self, subscriber: BroadcastSubscriber, identifier=None) -> Any:
...
def add_broadcast_subscriber(
self, subscriber: BroadcastSubscriber, subject_filter: str | Pattern[str] | None = None, identifier=None
) -> Any: ...

def remove_rpc_subscriber(self, identifier):
...
def remove_rpc_subscriber(self, identifier): ...

def remove_broadcast_subscriber(self, identifier):
...
def remove_broadcast_subscriber(self, identifier): ...

def broadcast_send(self, body, sender=None, subject=None, correlation_id=None) -> bool:
...
def broadcast_send(self, body, sender=None, subject=None, correlation_id=None) -> bool: ...
3 changes: 2 additions & 1 deletion src/plumpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ class PersistenceError(Exception):
class ClosedError(Exception):
"""Raised when an mutable operation is attempted on a closed process"""


class TaskRejectedError(Exception):
""" A task was rejected by the coordinacor"""
"""A task was rejected by the coordinacor"""
2 changes: 1 addition & 1 deletion src/plumpy/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import contextlib
from typing import Any, Awaitable, Callable, Generator, Optional

__all__ = ['CancellableAction', 'create_task', 'create_task', 'capture_exceptions']
__all__ = ['CancellableAction', 'capture_exceptions', 'create_task', 'create_task']


class InvalidFutureError(Exception):
Expand Down
9 changes: 4 additions & 5 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@
except ModuleNotFoundError:
from contextvars import ContextVar

import kiwipy
import yaml

from . import events, exceptions, message, persistence, ports, process_states, utils
from .futures import capture_exceptions, CancellableAction
from .base import state_machine
from .base.state_machine import StateEntryFailed, StateMachine, TransitionFailed, event
from .base.utils import call_with_super_check, super_check
from .event_helper import EventHelper
from .futures import CancellableAction, capture_exceptions
from .process_listener import ProcessListener
from .process_spec import ProcessSpec
from .utils import PID_TYPE, SAVED_STATE_TYPE, protected
Expand Down Expand Up @@ -313,9 +312,9 @@ def init(self) -> None:

try:
# filter out state change broadcasts
# TODO: pattern filter should be moved to add_broadcast_subscriber.
subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*'))
identifier = self._communicator.add_broadcast_subscriber(subscriber, identifier=str(self.pid))
identifier = self._communicator.add_broadcast_subscriber(
self.broadcast_receive, subject_filter=re.compile(r'^(?!state_changed).*'), identifier=str(self.pid)
)
self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
except concurrent.futures.TimeoutError:
self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)
Expand Down
4 changes: 2 additions & 2 deletions src/plumpy/rmq/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None:
return self._communicator.remove_task_subscriber(identifier)

def add_broadcast_subscriber(
self, subscriber: 'BroadcastSubscriber', identifier: Optional['ID_TYPE'] = None
self, subscriber: 'BroadcastSubscriber', subject_filter=None, identifier: Optional['ID_TYPE'] = None
) -> 'ID_TYPE':
converted = convert_to_comm(subscriber, self._loop)
return self._communicator.add_broadcast_subscriber(converted, identifier)
return self._communicator.add_broadcast_subscriber(converted, subject_filter, identifier)

def remove_broadcast_subscriber(self, identifier: 'ID_TYPE') -> None:
return self._communicator.remove_broadcast_subscriber(identifier)
Expand Down
7 changes: 3 additions & 4 deletions src/plumpy/rmq/process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@

import asyncio
import copy
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast
from typing import Any, Dict, Optional, Sequence, Union

import kiwipy

from plumpy import loaders
from plumpy.message import (
KILL_MSG,
MESSAGE_KEY,
PAUSE_MSG,
PLAY_MSG,
STATUS_MSG,
KILL_MSG,
Intent,
create_continue_body,
create_create_body,
create_launch_body,
)

from plumpy import loaders
from plumpy.utils import PID_TYPE

__all__ = [
Expand Down
2 changes: 1 addition & 1 deletion tests/rmq/test_communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_add_broadcast_subscriber(loop_communicator, subscriber):
assert loop_communicator.add_broadcast_subscriber(subscriber) is not None

identifier = 'identifier'
assert loop_communicator.add_broadcast_subscriber(subscriber, identifier) == identifier
assert loop_communicator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier


def test_remove_broadcast_subscriber(loop_communicator, subscriber):
Expand Down
5 changes: 3 additions & 2 deletions tests/rmq/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
import uuid

from kiwipy.rmq.communicator import kiwipy
import pytest
import shortuuid
import yaml
Expand Down Expand Up @@ -81,7 +82,7 @@ def get_broadcast(_comm, body, sender, subject, correlation_id):
assert result == BROADCAST

@pytest.mark.asyncio
async def test_broadcast_filter(self, loop_communicator):
async def test_broadcast_filter(self, loop_communicator: kiwipy.Communicator):
broadcast_future = asyncio.Future()

def ignore_broadcast(_comm, body, sender, subject, correlation_id):
Expand All @@ -90,7 +91,7 @@ def ignore_broadcast(_comm, body, sender, subject, correlation_id):
def get_broadcast(_comm, body, sender, subject, correlation_id):
broadcast_future.set_result(True)

loop_communicator.add_broadcast_subscriber(BroadcastFilter(ignore_broadcast, subject='other'))
loop_communicator.add_broadcast_subscriber(ignore_broadcast, subject_filter='other')
loop_communicator.add_broadcast_subscriber(get_broadcast)
loop_communicator.broadcast_send(
**{'body': 'present', 'sender': 'Martin', 'subject': 'sup', 'correlation_id': 420}
Expand Down

0 comments on commit 156d81a

Please sign in to comment.