From 985e560cea43cd9c3c845d65603a241a23f6e8df Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Tue, 17 Dec 2024 17:15:14 +0100 Subject: [PATCH] Rename rmq/process_comm -> rmq/process_control --- docs/source/concepts.md | 2 +- docs/source/tutorial.ipynb | 2 +- src/plumpy/rmq/__init__.py | 4 +- src/plumpy/rmq/communications.py | 4 +- .../{process_comms.py => process_control.py} | 0 tests/rmq/test_communications.py | 56 +++++++------------ tests/rmq/test_communicator.py | 4 +- ...ocess_comms.py => test_process_control.py} | 8 +-- 8 files changed, 31 insertions(+), 49 deletions(-) rename src/plumpy/rmq/{process_comms.py => process_control.py} (100%) rename tests/rmq/{test_process_comms.py => test_process_control.py} (97%) diff --git a/docs/source/concepts.md b/docs/source/concepts.md index ba6e8b17..0c39d515 100644 --- a/docs/source/concepts.md +++ b/docs/source/concepts.md @@ -32,7 +32,7 @@ WorkChains support the use of logical constructs such as `If_` and `While_` to c A `Controller` can control processes throughout their lifetime, by sending and receiving messages. It can launch, pause, continue, kill and check status of the process. -The {py:class}`~plumpy.process_comms.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker. +The {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker. The thread communicator runs on a independent thread (event loop) and so will not be blocked by sometimes long waiting times in the process event loop. Using RabbitMQ means that even if the computer is terminated unexpectedly, messages are persisted and can be run once the computer restarts. diff --git a/docs/source/tutorial.ipynb b/docs/source/tutorial.ipynb index b544d38b..ba0dd8ca 100644 --- a/docs/source/tutorial.ipynb +++ b/docs/source/tutorial.ipynb @@ -66,7 +66,7 @@ "The {py:class}`~plumpy.workchains.WorkChain`\n", ": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n", "\n", - "The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n", + "The process `Controller` (principally the {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController`)\n", ": To control the process or workchain throughout its lifetime." ] }, diff --git a/src/plumpy/rmq/__init__.py b/src/plumpy/rmq/__init__.py index ca14e02e..fbb9f243 100644 --- a/src/plumpy/rmq/__init__.py +++ b/src/plumpy/rmq/__init__.py @@ -3,6 +3,6 @@ from .communications import * from .exceptions import * from .futures import * -from .process_comms import * +from .process_control import * -__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_comms.__all__ +__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_control.__all__ diff --git a/src/plumpy/rmq/communications.py b/src/plumpy/rmq/communications.py index 6d1f337c..9dbafbed 100644 --- a/src/plumpy/rmq/communications.py +++ b/src/plumpy/rmq/communications.py @@ -131,10 +131,10 @@ def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None: return self._communicator.remove_task_subscriber(identifier) def add_broadcast_subscriber( - self, subscriber: 'BroadcastSubscriber', subject_filter=None, identifier: Optional['ID_TYPE'] = None + self, subscriber: 'BroadcastSubscriber', identifier: Optional['ID_TYPE'] = None ) -> 'ID_TYPE': converted = convert_to_comm(subscriber, self._loop) - return self._communicator.add_broadcast_subscriber(converted, subject_filter, identifier) + return self._communicator.add_broadcast_subscriber(converted, identifier) def remove_broadcast_subscriber(self, identifier: 'ID_TYPE') -> None: return self._communicator.remove_broadcast_subscriber(identifier) diff --git a/src/plumpy/rmq/process_comms.py b/src/plumpy/rmq/process_control.py similarity index 100% rename from src/plumpy/rmq/process_comms.py rename to src/plumpy/rmq/process_control.py diff --git a/tests/rmq/test_communications.py b/tests/rmq/test_communications.py index 4a585dd9..a24022fb 100644 --- a/tests/rmq/test_communications.py +++ b/tests/rmq/test_communications.py @@ -1,13 +1,10 @@ # -*- coding: utf-8 -*- """Tests for the :mod:`plumpy.rmq.communications` module.""" -from typing import override import kiwipy -from kiwipy.communications import exceptions import pytest from plumpy.rmq.communications import LoopCommunicator -import shortuuid class Subscriber: @@ -29,21 +26,6 @@ def rpc_send(self, recipient_id, msg): def broadcast_send(self, body, sender=None, subject=None, correlation_id=None): pass - @override - def add_broadcast_subscriber(self, subscriber, subject_filter, identifier=None): - """Duplicate the add_broadcast_subscriber from CommunicatorHelper and add support for - passing `subject_filter`. - """ - - self._ensure_open() - identifier = identifier or shortuuid.uuid() - subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter) - if identifier in self._broadcast_subscribers: - raise exceptions.DuplicateSubscriberIdentifier(f"Broadcast identifier '{identifier}'") - - self._broadcast_subscribers[identifier] = subscriber - return identifier - self._comm = LoopCommunicator(_Communicator()) def add_rpc_subscriber(self, subscriber, identifier=None): @@ -56,7 +38,7 @@ def add_broadcast_subscriber( identifier=None, ): subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter) - return self._comm.add_broadcast_subscriber(subscriber, subject_filter, identifier) + return self._comm.add_broadcast_subscriber(subscriber, identifier) def add_task_subscriber(self, subscriber, identifier=None): return self._comm.add_task_subscriber(subscriber, identifier) @@ -90,7 +72,7 @@ def close(self): @pytest.fixture -def _communicator(): +def _coordinator(): """Return an instance of `LoopCommunicator`.""" coordinator = CoordinatorWithLoopCommunicatorHelper() yield coordinator @@ -103,40 +85,40 @@ def subscriber(): return Subscriber() -def test_add_rpc_subscriber(_communicator, subscriber): +def test_add_rpc_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.add_rpc_subscriber` method.""" - assert _communicator.add_rpc_subscriber(subscriber) is not None + assert _coordinator.add_rpc_subscriber(subscriber) is not None identifier = 'identifier' - assert _communicator.add_rpc_subscriber(subscriber, identifier) == identifier + assert _coordinator.add_rpc_subscriber(subscriber, identifier) == identifier -def test_remove_rpc_subscriber(_communicator, subscriber): +def test_remove_rpc_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.remove_rpc_subscriber` method.""" - identifier = _communicator.add_rpc_subscriber(subscriber) - _communicator.remove_rpc_subscriber(identifier) + identifier = _coordinator.add_rpc_subscriber(subscriber) + _coordinator.remove_rpc_subscriber(identifier) -def test_add_broadcast_subscriber(_communicator, subscriber): +def test_add_broadcast_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.add_broadcast_subscriber` method.""" - assert _communicator.add_broadcast_subscriber(subscriber) is not None + assert _coordinator.add_broadcast_subscriber(subscriber) is not None identifier = 'identifier' - assert _communicator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier + assert _coordinator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier -def test_remove_broadcast_subscriber(_communicator, subscriber): +def test_remove_broadcast_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.remove_broadcast_subscriber` method.""" - identifier = _communicator.add_broadcast_subscriber(subscriber) - _communicator.remove_broadcast_subscriber(identifier) + identifier = _coordinator.add_broadcast_subscriber(subscriber) + _coordinator.remove_broadcast_subscriber(identifier) -def test_add_task_subscriber(_communicator, subscriber): +def test_add_task_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.add_task_subscriber` method.""" - assert _communicator.add_task_subscriber(subscriber) is not None + assert _coordinator.add_task_subscriber(subscriber) is not None -def test_remove_task_subscriber(_communicator, subscriber): +def test_remove_task_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.remove_task_subscriber` method.""" - identifier = _communicator.add_task_subscriber(subscriber) - _communicator.remove_task_subscriber(identifier) + identifier = _coordinator.add_task_subscriber(subscriber) + _coordinator.remove_task_subscriber(identifier) diff --git a/tests/rmq/test_communicator.py b/tests/rmq/test_communicator.py index ef336293..9ec0a33e 100644 --- a/tests/rmq/test_communicator.py +++ b/tests/rmq/test_communicator.py @@ -14,7 +14,7 @@ from kiwipy import rmq import plumpy -from plumpy.rmq import communications, process_comms +from plumpy.rmq import communications, process_control from .. import utils @@ -55,7 +55,7 @@ def loop_communicator(): @pytest.fixture def async_controller(loop_communicator: communications.LoopCommunicator): - yield process_comms.RemoteProcessController(loop_communicator) + yield process_control.RemoteProcessController(loop_communicator) class TestLoopCommunicator: diff --git a/tests/rmq/test_process_comms.py b/tests/rmq/test_process_control.py similarity index 97% rename from tests/rmq/test_process_comms.py rename to tests/rmq/test_process_control.py index 18abcacf..549ef9c6 100644 --- a/tests/rmq/test_process_comms.py +++ b/tests/rmq/test_process_control.py @@ -8,7 +8,7 @@ from kiwipy import rmq import plumpy -from plumpy.rmq import process_comms +from plumpy.rmq import process_control from .. import utils @@ -79,12 +79,12 @@ def _coordinator(): @pytest.fixture def async_controller(_coordinator): - yield process_comms.RemoteProcessController(_coordinator) + yield process_control.RemoteProcessController(_coordinator) @pytest.fixture def sync_controller(_coordinator): - yield process_comms.RemoteProcessThreadController(_coordinator) + yield process_control.RemoteProcessThreadController(_coordinator) class TestRemoteProcessController: @@ -240,7 +240,7 @@ async def test_kill_all(self, _coordinator, sync_controller): for _ in range(10): procs.append(utils.WaitForSignalProcess(coordinator=_coordinator)) - msg = process_comms.MessageBuilder.kill(text='bang bang, I shot you down') + msg = process_control.MessageBuilder.kill(text='bang bang, I shot you down') sync_controller.kill_all(msg) await utils.wait_util(lambda: all([proc.killed() for proc in procs]))