Skip to content

Commit

Permalink
Showing how using interface can avoid making change in kiwipy
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 17, 2024
1 parent 5091a17 commit 8fcd597
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 128 deletions.
2 changes: 1 addition & 1 deletion docs/source/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ WorkChains support the use of logical constructs such as `If_` and `While_` to c

A `Controller` can control processes throughout their lifetime, by sending and receiving messages. It can launch, pause, continue, kill and check status of the process.

The {py:class}`~plumpy.process_comms.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.
The {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.

The thread communicator runs on a independent thread (event loop) and so will not be blocked by sometimes long waiting times in the process event loop.
Using RabbitMQ means that even if the computer is terminated unexpectedly, messages are persisted and can be run once the computer restarts.
2 changes: 1 addition & 1 deletion docs/source/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"The {py:class}`~plumpy.workchains.WorkChain`\n",
": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n",
"\n",
"The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n",
"The process `Controller` (principally the {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController`)\n",
": To control the process or workchain throughout its lifetime."
]
},
Expand Down
4 changes: 2 additions & 2 deletions src/plumpy/rmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
from .communications import *
from .exceptions import *
from .futures import *
from .process_comms import *
from .process_control import *

__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_comms.__all__
__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_control.__all__
4 changes: 2 additions & 2 deletions src/plumpy/rmq/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None:
return self._communicator.remove_task_subscriber(identifier)

def add_broadcast_subscriber(
self, subscriber: 'BroadcastSubscriber', subject_filter=None, identifier: Optional['ID_TYPE'] = None
self, subscriber: 'BroadcastSubscriber', identifier: Optional['ID_TYPE'] = None
) -> 'ID_TYPE':
converted = convert_to_comm(subscriber, self._loop)
return self._communicator.add_broadcast_subscriber(converted, subject_filter, identifier)
return self._communicator.add_broadcast_subscriber(converted, identifier)

def remove_broadcast_subscriber(self, identifier: 'ID_TYPE') -> None:
return self._communicator.remove_broadcast_subscriber(identifier)
Expand Down
File renamed without changes.
100 changes: 74 additions & 26 deletions tests/rmq/test_communications.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
"""Tests for the :mod:`plumpy.rmq.communications` module."""

import kiwipy
import pytest
from kiwipy import CommunicatorHelper

from plumpy.rmq.communications import LoopCommunicator

Expand All @@ -14,21 +14,69 @@ def __call__(self):
pass


class Communicator(CommunicatorHelper):
def task_send(self, task, no_reply=False):
pass
class CoordinatorWithLoopCommunicatorHelper:
def __init__(self):
class _Communicator(kiwipy.CommunicatorHelper):
def task_send(self, task, no_reply=False):
pass

def rpc_send(self, recipient_id, msg):
pass

def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
pass

self._comm = LoopCommunicator(_Communicator())

def add_rpc_subscriber(self, subscriber, identifier=None):
return self._comm.add_rpc_subscriber(subscriber, identifier)

def add_broadcast_subscriber(
self,
subscriber,
subject_filter=None,
identifier=None,
):
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
return self._comm.add_broadcast_subscriber(subscriber, identifier)

def add_task_subscriber(self, subscriber, identifier=None):
return self._comm.add_task_subscriber(subscriber, identifier)

def remove_rpc_subscriber(self, identifier):
return self._comm.remove_rpc_subscriber(identifier)

def remove_broadcast_subscriber(self, identifier):
return self._comm.remove_broadcast_subscriber(identifier)

def remove_task_subscriber(self, identifier):
return self._comm.remove_task_subscriber(identifier)

def rpc_send(self, recipient_id, msg):
pass
return self._comm.rpc_send(recipient_id, msg)

def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
pass
def broadcast_send(
self,
body,
sender=None,
subject=None,
correlation_id=None,
):
return self._comm.broadcast_send(body, sender, subject, correlation_id)

def task_send(self, task, no_reply=False):
return self._comm.task_send(task, no_reply)

def close(self):
self._comm.close()


@pytest.fixture
def loop_communicator():
def _coordinator():
"""Return an instance of `LoopCommunicator`."""
return LoopCommunicator(Communicator())
coordinator = CoordinatorWithLoopCommunicatorHelper()
yield coordinator
coordinator.close()


@pytest.fixture
Expand All @@ -37,40 +85,40 @@ def subscriber():
return Subscriber()


def test_add_rpc_subscriber(loop_communicator, subscriber):
def test_add_rpc_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.add_rpc_subscriber` method."""
assert loop_communicator.add_rpc_subscriber(subscriber) is not None
assert _coordinator.add_rpc_subscriber(subscriber) is not None

identifier = 'identifier'
assert loop_communicator.add_rpc_subscriber(subscriber, identifier) == identifier
assert _coordinator.add_rpc_subscriber(subscriber, identifier) == identifier


def test_remove_rpc_subscriber(loop_communicator, subscriber):
def test_remove_rpc_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.remove_rpc_subscriber` method."""
identifier = loop_communicator.add_rpc_subscriber(subscriber)
loop_communicator.remove_rpc_subscriber(identifier)
identifier = _coordinator.add_rpc_subscriber(subscriber)
_coordinator.remove_rpc_subscriber(identifier)


def test_add_broadcast_subscriber(loop_communicator, subscriber):
def test_add_broadcast_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.add_broadcast_subscriber` method."""
assert loop_communicator.add_broadcast_subscriber(subscriber) is not None
assert _coordinator.add_broadcast_subscriber(subscriber) is not None

identifier = 'identifier'
assert loop_communicator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier
assert _coordinator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier


def test_remove_broadcast_subscriber(loop_communicator, subscriber):
def test_remove_broadcast_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.remove_broadcast_subscriber` method."""
identifier = loop_communicator.add_broadcast_subscriber(subscriber)
loop_communicator.remove_broadcast_subscriber(identifier)
identifier = _coordinator.add_broadcast_subscriber(subscriber)
_coordinator.remove_broadcast_subscriber(identifier)


def test_add_task_subscriber(loop_communicator, subscriber):
def test_add_task_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.add_task_subscriber` method."""
assert loop_communicator.add_task_subscriber(subscriber) is not None
assert _coordinator.add_task_subscriber(subscriber) is not None


def test_remove_task_subscriber(loop_communicator, subscriber):
def test_remove_task_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.remove_task_subscriber` method."""
identifier = loop_communicator.add_task_subscriber(subscriber)
loop_communicator.remove_task_subscriber(identifier)
identifier = _coordinator.add_task_subscriber(subscriber)
_coordinator.remove_task_subscriber(identifier)
Loading

0 comments on commit 8fcd597

Please sign in to comment.