Skip to content

Commit

Permalink
Rename rmq/process_comm -> rmq/process_control
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 17, 2024
1 parent 18de42f commit 985e560
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 49 deletions.
2 changes: 1 addition & 1 deletion docs/source/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ WorkChains support the use of logical constructs such as `If_` and `While_` to c

A `Controller` can control processes throughout their lifetime, by sending and receiving messages. It can launch, pause, continue, kill and check status of the process.

The {py:class}`~plumpy.process_comms.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.
The {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.

The thread communicator runs on a independent thread (event loop) and so will not be blocked by sometimes long waiting times in the process event loop.
Using RabbitMQ means that even if the computer is terminated unexpectedly, messages are persisted and can be run once the computer restarts.
2 changes: 1 addition & 1 deletion docs/source/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"The {py:class}`~plumpy.workchains.WorkChain`\n",
": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n",
"\n",
"The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n",
"The process `Controller` (principally the {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController`)\n",
": To control the process or workchain throughout its lifetime."
]
},
Expand Down
4 changes: 2 additions & 2 deletions src/plumpy/rmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
from .communications import *
from .exceptions import *
from .futures import *
from .process_comms import *
from .process_control import *

__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_comms.__all__
__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_control.__all__
4 changes: 2 additions & 2 deletions src/plumpy/rmq/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None:
return self._communicator.remove_task_subscriber(identifier)

def add_broadcast_subscriber(
self, subscriber: 'BroadcastSubscriber', subject_filter=None, identifier: Optional['ID_TYPE'] = None
self, subscriber: 'BroadcastSubscriber', identifier: Optional['ID_TYPE'] = None
) -> 'ID_TYPE':
converted = convert_to_comm(subscriber, self._loop)
return self._communicator.add_broadcast_subscriber(converted, subject_filter, identifier)
return self._communicator.add_broadcast_subscriber(converted, identifier)

def remove_broadcast_subscriber(self, identifier: 'ID_TYPE') -> None:
return self._communicator.remove_broadcast_subscriber(identifier)
Expand Down
File renamed without changes.
56 changes: 19 additions & 37 deletions tests/rmq/test_communications.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
# -*- coding: utf-8 -*-
"""Tests for the :mod:`plumpy.rmq.communications` module."""

from typing import override
import kiwipy
from kiwipy.communications import exceptions
import pytest

from plumpy.rmq.communications import LoopCommunicator
import shortuuid


class Subscriber:
Expand All @@ -29,21 +26,6 @@ def rpc_send(self, recipient_id, msg):
def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
pass

@override
def add_broadcast_subscriber(self, subscriber, subject_filter, identifier=None):
"""Duplicate the add_broadcast_subscriber from CommunicatorHelper and add support for
passing `subject_filter`.
"""

self._ensure_open()
identifier = identifier or shortuuid.uuid()
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
if identifier in self._broadcast_subscribers:
raise exceptions.DuplicateSubscriberIdentifier(f"Broadcast identifier '{identifier}'")

self._broadcast_subscribers[identifier] = subscriber
return identifier

self._comm = LoopCommunicator(_Communicator())

def add_rpc_subscriber(self, subscriber, identifier=None):
Expand All @@ -56,7 +38,7 @@ def add_broadcast_subscriber(
identifier=None,
):
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
return self._comm.add_broadcast_subscriber(subscriber, subject_filter, identifier)
return self._comm.add_broadcast_subscriber(subscriber, identifier)

def add_task_subscriber(self, subscriber, identifier=None):
return self._comm.add_task_subscriber(subscriber, identifier)
Expand Down Expand Up @@ -90,7 +72,7 @@ def close(self):


@pytest.fixture
def _communicator():
def _coordinator():
"""Return an instance of `LoopCommunicator`."""
coordinator = CoordinatorWithLoopCommunicatorHelper()
yield coordinator
Expand All @@ -103,40 +85,40 @@ def subscriber():
return Subscriber()


def test_add_rpc_subscriber(_communicator, subscriber):
def test_add_rpc_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.add_rpc_subscriber` method."""
assert _communicator.add_rpc_subscriber(subscriber) is not None
assert _coordinator.add_rpc_subscriber(subscriber) is not None

identifier = 'identifier'
assert _communicator.add_rpc_subscriber(subscriber, identifier) == identifier
assert _coordinator.add_rpc_subscriber(subscriber, identifier) == identifier


def test_remove_rpc_subscriber(_communicator, subscriber):
def test_remove_rpc_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.remove_rpc_subscriber` method."""
identifier = _communicator.add_rpc_subscriber(subscriber)
_communicator.remove_rpc_subscriber(identifier)
identifier = _coordinator.add_rpc_subscriber(subscriber)
_coordinator.remove_rpc_subscriber(identifier)


def test_add_broadcast_subscriber(_communicator, subscriber):
def test_add_broadcast_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.add_broadcast_subscriber` method."""
assert _communicator.add_broadcast_subscriber(subscriber) is not None
assert _coordinator.add_broadcast_subscriber(subscriber) is not None

identifier = 'identifier'
assert _communicator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier
assert _coordinator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier


def test_remove_broadcast_subscriber(_communicator, subscriber):
def test_remove_broadcast_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.remove_broadcast_subscriber` method."""
identifier = _communicator.add_broadcast_subscriber(subscriber)
_communicator.remove_broadcast_subscriber(identifier)
identifier = _coordinator.add_broadcast_subscriber(subscriber)
_coordinator.remove_broadcast_subscriber(identifier)


def test_add_task_subscriber(_communicator, subscriber):
def test_add_task_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.add_task_subscriber` method."""
assert _communicator.add_task_subscriber(subscriber) is not None
assert _coordinator.add_task_subscriber(subscriber) is not None


def test_remove_task_subscriber(_communicator, subscriber):
def test_remove_task_subscriber(_coordinator, subscriber):
"""Test the `LoopCommunicator.remove_task_subscriber` method."""
identifier = _communicator.add_task_subscriber(subscriber)
_communicator.remove_task_subscriber(identifier)
identifier = _coordinator.add_task_subscriber(subscriber)
_coordinator.remove_task_subscriber(identifier)
4 changes: 2 additions & 2 deletions tests/rmq/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from kiwipy import rmq

import plumpy
from plumpy.rmq import communications, process_comms
from plumpy.rmq import communications, process_control

from .. import utils

Expand Down Expand Up @@ -55,7 +55,7 @@ def loop_communicator():

@pytest.fixture
def async_controller(loop_communicator: communications.LoopCommunicator):
yield process_comms.RemoteProcessController(loop_communicator)
yield process_control.RemoteProcessController(loop_communicator)


class TestLoopCommunicator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kiwipy import rmq

import plumpy
from plumpy.rmq import process_comms
from plumpy.rmq import process_control

from .. import utils

Expand Down Expand Up @@ -79,12 +79,12 @@ def _coordinator():

@pytest.fixture
def async_controller(_coordinator):
yield process_comms.RemoteProcessController(_coordinator)
yield process_control.RemoteProcessController(_coordinator)


@pytest.fixture
def sync_controller(_coordinator):
yield process_comms.RemoteProcessThreadController(_coordinator)
yield process_control.RemoteProcessThreadController(_coordinator)


class TestRemoteProcessController:
Expand Down Expand Up @@ -240,7 +240,7 @@ async def test_kill_all(self, _coordinator, sync_controller):
for _ in range(10):
procs.append(utils.WaitForSignalProcess(coordinator=_coordinator))

msg = process_comms.MessageBuilder.kill(text='bang bang, I shot you down')
msg = process_control.MessageBuilder.kill(text='bang bang, I shot you down')

sync_controller.kill_all(msg)
await utils.wait_util(lambda: all([proc.killed() for proc in procs]))
Expand Down

0 comments on commit 985e560

Please sign in to comment.