Skip to content

Commit

Permalink
RmqCoordinator is constructed from comm for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 17, 2024
1 parent cf7e6fd commit aebec4f
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 186 deletions.
50 changes: 50 additions & 0 deletions tests/rmq/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

import kiwipy


class RmqCoordinator:
def __init__(self, comm: kiwipy.Communicator):
self._comm = comm

def add_rpc_subscriber(self, subscriber, identifier=None):
return self._comm.add_rpc_subscriber(subscriber, identifier)

def add_broadcast_subscriber(
self,
subscriber,
subject_filter=None,
identifier=None,
):
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
return self._comm.add_broadcast_subscriber(subscriber, identifier)

def add_task_subscriber(self, subscriber, identifier=None):
return self._comm.add_task_subscriber(subscriber, identifier)

def remove_rpc_subscriber(self, identifier):
return self._comm.remove_rpc_subscriber(identifier)

def remove_broadcast_subscriber(self, identifier):
return self._comm.remove_broadcast_subscriber(identifier)

def remove_task_subscriber(self, identifier):
return self._comm.remove_task_subscriber(identifier)

def rpc_send(self, recipient_id, msg):
return self._comm.rpc_send(recipient_id, msg)

def broadcast_send(
self,
body,
sender=None,
subject=None,
correlation_id=None,
):
return self._comm.broadcast_send(body, sender, subject, correlation_id)

def task_send(self, task, no_reply=False):
return self._comm.task_send(task, no_reply)

def close(self):
self._comm.close()

88 changes: 23 additions & 65 deletions tests/rmq/test_communications.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,45 @@
# -*- coding: utf-8 -*-
"""Tests for the :mod:`plumpy.rmq.communications` module."""

import kiwipy
import pytest

import kiwipy
from plumpy.rmq.communications import LoopCommunicator
from . import RmqCoordinator


class Subscriber:
"""Test class that mocks a subscriber."""

def __call__(self):
pass


class CoordinatorWithLoopCommunicatorHelper:
def __init__(self):
class _Communicator(kiwipy.CommunicatorHelper):
def task_send(self, task, no_reply=False):
pass

def rpc_send(self, recipient_id, msg):
pass

def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
pass

self._comm = LoopCommunicator(_Communicator())

def add_rpc_subscriber(self, subscriber, identifier=None):
return self._comm.add_rpc_subscriber(subscriber, identifier)

def add_broadcast_subscriber(
self,
subscriber,
subject_filter=None,
identifier=None,
):
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
return self._comm.add_broadcast_subscriber(subscriber, identifier)

def add_task_subscriber(self, subscriber, identifier=None):
return self._comm.add_task_subscriber(subscriber, identifier)

def remove_rpc_subscriber(self, identifier):
return self._comm.remove_rpc_subscriber(identifier)

def remove_broadcast_subscriber(self, identifier):
return self._comm.remove_broadcast_subscriber(identifier)

def remove_task_subscriber(self, identifier):
return self._comm.remove_task_subscriber(identifier)

def rpc_send(self, recipient_id, msg):
return self._comm.rpc_send(recipient_id, msg)
@pytest.fixture
def _coordinator():
"""Return an instance of `LoopCommunicator`."""

def broadcast_send(
self,
body,
sender=None,
subject=None,
correlation_id=None,
):
return self._comm.broadcast_send(body, sender, subject, correlation_id)
class _Communicator(kiwipy.CommunicatorHelper):
def task_send(self, task, no_reply=False):
pass

def task_send(self, task, no_reply=False):
return self._comm.task_send(task, no_reply)
def rpc_send(self, recipient_id, msg):
pass

def close(self):
self._comm.close()
def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
pass

comm = LoopCommunicator(_Communicator())
coordinator = RmqCoordinator(comm)

@pytest.fixture
def _coordinator():
"""Return an instance of `LoopCommunicator`."""
coordinator = CoordinatorWithLoopCommunicatorHelper()
yield coordinator

coordinator.close()


@pytest.fixture
def subscriber():
"""Return an instance of `Subscriber`."""
"""Return an instance of mocked `Subscriber`."""

class Subscriber:
"""Test class that mocks a subscriber."""

def __call__(self):
pass

return Subscriber()


Expand Down
82 changes: 19 additions & 63 deletions tests/rmq/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import shortuuid
import yaml

import kiwipy
from kiwipy.rmq import RmqThreadCommunicator

import plumpy
from plumpy.coordinator import Coordinator
from plumpy.rmq import communications, process_control

from . import RmqCoordinator
from .. import utils


Expand All @@ -30,71 +30,27 @@ def persister():
shutil.rmtree(_tmppath)


class CoordinatorWithLoopRmqThreadCommunicator:
def __init__(self):
message_exchange = f'{__file__}.{shortuuid.uuid()}'
task_exchange = f'{__file__}.{shortuuid.uuid()}'
task_queue = f'{__file__}.{shortuuid.uuid()}'

thread_comm = RmqThreadCommunicator.connect(
connection_params={'url': 'amqp://guest:guest@localhost:5672/'},
message_exchange=message_exchange,
task_exchange=task_exchange,
task_queue=task_queue,
decoder=functools.partial(yaml.load, Loader=yaml.Loader),
)

loop = asyncio.get_event_loop()
loop.set_debug(True)
self._comm = communications.LoopCommunicator(thread_comm, loop=loop)

def add_rpc_subscriber(self, subscriber, identifier=None):
return self._comm.add_rpc_subscriber(subscriber, identifier)

def add_broadcast_subscriber(
self,
subscriber,
subject_filter=None,
identifier=None,
):
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
return self._comm.add_broadcast_subscriber(subscriber, identifier)

def add_task_subscriber(self, subscriber, identifier=None):
return self._comm.add_task_subscriber(subscriber, identifier)

def remove_rpc_subscriber(self, identifier):
return self._comm.remove_rpc_subscriber(identifier)

def remove_broadcast_subscriber(self, identifier):
return self._comm.remove_broadcast_subscriber(identifier)

def remove_task_subscriber(self, identifier):
return self._comm.remove_task_subscriber(identifier)

def rpc_send(self, recipient_id, msg):
return self._comm.rpc_send(recipient_id, msg)

def broadcast_send(
self,
body,
sender=None,
subject=None,
correlation_id=None,
):
return self._comm.broadcast_send(body, sender, subject, correlation_id)

def task_send(self, task, no_reply=False):
return self._comm.task_send(task, no_reply)

def close(self):
self._comm.close()


@pytest.fixture
def _coordinator():
coordinator = CoordinatorWithLoopRmqThreadCommunicator()
message_exchange = f'{__file__}.{shortuuid.uuid()}'
task_exchange = f'{__file__}.{shortuuid.uuid()}'
task_queue = f'{__file__}.{shortuuid.uuid()}'

thread_comm = RmqThreadCommunicator.connect(
connection_params={'url': 'amqp://guest:guest@localhost:5672/'},
message_exchange=message_exchange,
task_exchange=task_exchange,
task_queue=task_queue,
decoder=functools.partial(yaml.load, Loader=yaml.Loader),
)

loop = asyncio.get_event_loop()
loop.set_debug(True)
comm = communications.LoopCommunicator(thread_comm, loop=loop)
coordinator = RmqCoordinator(comm)

yield coordinator

coordinator.close()


Expand Down
73 changes: 15 additions & 58 deletions tests/rmq/test_process_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,27 @@
import plumpy
from plumpy.rmq import process_control

from . import RmqCoordinator
from .. import utils


class CoordinatorWithRmqThreadCommunicator:
def __init__(self):
message_exchange = f'{__file__}.{shortuuid.uuid()}'
task_exchange = f'{__file__}.{shortuuid.uuid()}'
task_queue = f'{__file__}.{shortuuid.uuid()}'

self._comm = rmq.RmqThreadCommunicator.connect(
connection_params={'url': 'amqp://guest:guest@localhost:5672/'},
message_exchange=message_exchange,
task_exchange=task_exchange,
task_queue=task_queue,
)
self._comm._loop.set_debug(True)

def add_rpc_subscriber(self, subscriber, identifier=None):
return self._comm.add_rpc_subscriber(subscriber, identifier)

def add_broadcast_subscriber(
self,
subscriber,
subject_filter=None,
identifier=None,
):
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
return self._comm.add_broadcast_subscriber(subscriber, identifier)

def add_task_subscriber(self, subscriber, identifier=None):
return self._comm.add_task_subscriber(subscriber, identifier)

def remove_rpc_subscriber(self, identifier):
return self._comm.remove_rpc_subscriber(identifier)

def remove_broadcast_subscriber(self, identifier):
return self._comm.remove_broadcast_subscriber(identifier)

def remove_task_subscriber(self, identifier):
return self._comm.remove_task_subscriber(identifier)

def rpc_send(self, recipient_id, msg):
return self._comm.rpc_send(recipient_id, msg)

def broadcast_send(
self,
body,
sender=None,
subject=None,
correlation_id=None,
):
return self._comm.broadcast_send(body, sender, subject, correlation_id)

def task_send(self, task, no_reply=False):
return self._comm.task_send(task, no_reply)

def close(self):
self._comm.close()


@pytest.fixture
def _coordinator():
coordinator = CoordinatorWithRmqThreadCommunicator()
message_exchange = f'{__file__}.{shortuuid.uuid()}'
task_exchange = f'{__file__}.{shortuuid.uuid()}'
task_queue = f'{__file__}.{shortuuid.uuid()}'

comm = rmq.RmqThreadCommunicator.connect(
connection_params={'url': 'amqp://guest:guest@localhost:5672/'},
message_exchange=message_exchange,
task_exchange=task_exchange,
task_queue=task_queue,
)
comm._loop.set_debug(True)
coordinator = RmqCoordinator(comm)

yield coordinator

coordinator.close()


Expand Down

0 comments on commit aebec4f

Please sign in to comment.