diff --git a/docs/source/concepts.md b/docs/source/concepts.md index ba6e8b17..0c39d515 100644 --- a/docs/source/concepts.md +++ b/docs/source/concepts.md @@ -32,7 +32,7 @@ WorkChains support the use of logical constructs such as `If_` and `While_` to c A `Controller` can control processes throughout their lifetime, by sending and receiving messages. It can launch, pause, continue, kill and check status of the process. -The {py:class}`~plumpy.process_comms.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker. +The {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker. The thread communicator runs on a independent thread (event loop) and so will not be blocked by sometimes long waiting times in the process event loop. Using RabbitMQ means that even if the computer is terminated unexpectedly, messages are persisted and can be run once the computer restarts. diff --git a/docs/source/tutorial.ipynb b/docs/source/tutorial.ipynb index b544d38b..ba0dd8ca 100644 --- a/docs/source/tutorial.ipynb +++ b/docs/source/tutorial.ipynb @@ -66,7 +66,7 @@ "The {py:class}`~plumpy.workchains.WorkChain`\n", ": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n", "\n", - "The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n", + "The process `Controller` (principally the {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController`)\n", ": To control the process or workchain throughout its lifetime." ] }, diff --git a/src/plumpy/rmq/__init__.py b/src/plumpy/rmq/__init__.py index ca14e02e..fbb9f243 100644 --- a/src/plumpy/rmq/__init__.py +++ b/src/plumpy/rmq/__init__.py @@ -3,6 +3,6 @@ from .communications import * from .exceptions import * from .futures import * -from .process_comms import * +from .process_control import * -__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_comms.__all__ +__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_control.__all__ diff --git a/src/plumpy/rmq/communications.py b/src/plumpy/rmq/communications.py index 6d1f337c..9dbafbed 100644 --- a/src/plumpy/rmq/communications.py +++ b/src/plumpy/rmq/communications.py @@ -131,10 +131,10 @@ def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None: return self._communicator.remove_task_subscriber(identifier) def add_broadcast_subscriber( - self, subscriber: 'BroadcastSubscriber', subject_filter=None, identifier: Optional['ID_TYPE'] = None + self, subscriber: 'BroadcastSubscriber', identifier: Optional['ID_TYPE'] = None ) -> 'ID_TYPE': converted = convert_to_comm(subscriber, self._loop) - return self._communicator.add_broadcast_subscriber(converted, subject_filter, identifier) + return self._communicator.add_broadcast_subscriber(converted, identifier) def remove_broadcast_subscriber(self, identifier: 'ID_TYPE') -> None: return self._communicator.remove_broadcast_subscriber(identifier) diff --git a/src/plumpy/rmq/process_comms.py b/src/plumpy/rmq/process_control.py similarity index 100% rename from src/plumpy/rmq/process_comms.py rename to src/plumpy/rmq/process_control.py diff --git a/tests/rmq/test_communications.py b/tests/rmq/test_communications.py index 00b7f1c6..a24022fb 100644 --- a/tests/rmq/test_communications.py +++ b/tests/rmq/test_communications.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- """Tests for the :mod:`plumpy.rmq.communications` module.""" +import kiwipy import pytest -from kiwipy import CommunicatorHelper from plumpy.rmq.communications import LoopCommunicator @@ -14,21 +14,69 @@ def __call__(self): pass -class Communicator(CommunicatorHelper): - def task_send(self, task, no_reply=False): - pass +class CoordinatorWithLoopCommunicatorHelper: + def __init__(self): + class _Communicator(kiwipy.CommunicatorHelper): + def task_send(self, task, no_reply=False): + pass + + def rpc_send(self, recipient_id, msg): + pass + + def broadcast_send(self, body, sender=None, subject=None, correlation_id=None): + pass + + self._comm = LoopCommunicator(_Communicator()) + + def add_rpc_subscriber(self, subscriber, identifier=None): + return self._comm.add_rpc_subscriber(subscriber, identifier) + + def add_broadcast_subscriber( + self, + subscriber, + subject_filter=None, + identifier=None, + ): + subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter) + return self._comm.add_broadcast_subscriber(subscriber, identifier) + + def add_task_subscriber(self, subscriber, identifier=None): + return self._comm.add_task_subscriber(subscriber, identifier) + + def remove_rpc_subscriber(self, identifier): + return self._comm.remove_rpc_subscriber(identifier) + + def remove_broadcast_subscriber(self, identifier): + return self._comm.remove_broadcast_subscriber(identifier) + + def remove_task_subscriber(self, identifier): + return self._comm.remove_task_subscriber(identifier) def rpc_send(self, recipient_id, msg): - pass + return self._comm.rpc_send(recipient_id, msg) - def broadcast_send(self, body, sender=None, subject=None, correlation_id=None): - pass + def broadcast_send( + self, + body, + sender=None, + subject=None, + correlation_id=None, + ): + return self._comm.broadcast_send(body, sender, subject, correlation_id) + + def task_send(self, task, no_reply=False): + return self._comm.task_send(task, no_reply) + + def close(self): + self._comm.close() @pytest.fixture -def loop_communicator(): +def _coordinator(): """Return an instance of `LoopCommunicator`.""" - return LoopCommunicator(Communicator()) + coordinator = CoordinatorWithLoopCommunicatorHelper() + yield coordinator + coordinator.close() @pytest.fixture @@ -37,40 +85,40 @@ def subscriber(): return Subscriber() -def test_add_rpc_subscriber(loop_communicator, subscriber): +def test_add_rpc_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.add_rpc_subscriber` method.""" - assert loop_communicator.add_rpc_subscriber(subscriber) is not None + assert _coordinator.add_rpc_subscriber(subscriber) is not None identifier = 'identifier' - assert loop_communicator.add_rpc_subscriber(subscriber, identifier) == identifier + assert _coordinator.add_rpc_subscriber(subscriber, identifier) == identifier -def test_remove_rpc_subscriber(loop_communicator, subscriber): +def test_remove_rpc_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.remove_rpc_subscriber` method.""" - identifier = loop_communicator.add_rpc_subscriber(subscriber) - loop_communicator.remove_rpc_subscriber(identifier) + identifier = _coordinator.add_rpc_subscriber(subscriber) + _coordinator.remove_rpc_subscriber(identifier) -def test_add_broadcast_subscriber(loop_communicator, subscriber): +def test_add_broadcast_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.add_broadcast_subscriber` method.""" - assert loop_communicator.add_broadcast_subscriber(subscriber) is not None + assert _coordinator.add_broadcast_subscriber(subscriber) is not None identifier = 'identifier' - assert loop_communicator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier + assert _coordinator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier -def test_remove_broadcast_subscriber(loop_communicator, subscriber): +def test_remove_broadcast_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.remove_broadcast_subscriber` method.""" - identifier = loop_communicator.add_broadcast_subscriber(subscriber) - loop_communicator.remove_broadcast_subscriber(identifier) + identifier = _coordinator.add_broadcast_subscriber(subscriber) + _coordinator.remove_broadcast_subscriber(identifier) -def test_add_task_subscriber(loop_communicator, subscriber): +def test_add_task_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.add_task_subscriber` method.""" - assert loop_communicator.add_task_subscriber(subscriber) is not None + assert _coordinator.add_task_subscriber(subscriber) is not None -def test_remove_task_subscriber(loop_communicator, subscriber): +def test_remove_task_subscriber(_coordinator, subscriber): """Test the `LoopCommunicator.remove_task_subscriber` method.""" - identifier = loop_communicator.add_task_subscriber(subscriber) - loop_communicator.remove_task_subscriber(identifier) + identifier = _coordinator.add_task_subscriber(subscriber) + _coordinator.remove_task_subscriber(identifier) diff --git a/tests/rmq/test_communicator.py b/tests/rmq/test_communicator.py index 80c1ac71..9dd5fa72 100644 --- a/tests/rmq/test_communicator.py +++ b/tests/rmq/test_communicator.py @@ -6,15 +6,16 @@ import shutil import tempfile import uuid - -from kiwipy.rmq.communicator import kiwipy import pytest import shortuuid import yaml -from kiwipy import BroadcastFilter, rmq + +import kiwipy +from kiwipy.rmq import RmqThreadCommunicator import plumpy -from plumpy.rmq import communications, process_comms +from plumpy.coordinator import Coordinator +from plumpy.rmq import communications, process_control from .. import utils @@ -29,40 +30,84 @@ def persister(): shutil.rmtree(_tmppath) -@pytest.fixture -def loop_communicator(): - message_exchange = f'{__file__}.{shortuuid.uuid()}' - task_exchange = f'{__file__}.{shortuuid.uuid()}' - task_queue = f'{__file__}.{shortuuid.uuid()}' +class CoordinatorWithLoopRmqThreadCommunicator: + def __init__(self): + message_exchange = f'{__file__}.{shortuuid.uuid()}' + task_exchange = f'{__file__}.{shortuuid.uuid()}' + task_queue = f'{__file__}.{shortuuid.uuid()}' + + thread_comm = RmqThreadCommunicator.connect( + connection_params={'url': 'amqp://guest:guest@localhost:5672/'}, + message_exchange=message_exchange, + task_exchange=task_exchange, + task_queue=task_queue, + decoder=functools.partial(yaml.load, Loader=yaml.Loader), + ) + + loop = asyncio.get_event_loop() + loop.set_debug(True) + self._comm = communications.LoopCommunicator(thread_comm, loop=loop) + + def add_rpc_subscriber(self, subscriber, identifier=None): + return self._comm.add_rpc_subscriber(subscriber, identifier) - thread_communicator = rmq.RmqThreadCommunicator.connect( - connection_params={'url': 'amqp://guest:guest@localhost:5672/'}, - message_exchange=message_exchange, - task_exchange=task_exchange, - task_queue=task_queue, - decoder=functools.partial(yaml.load, Loader=yaml.Loader), - ) + def add_broadcast_subscriber( + self, + subscriber, + subject_filter=None, + identifier=None, + ): + subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter) + return self._comm.add_broadcast_subscriber(subscriber, identifier) - loop = asyncio.get_event_loop() - loop.set_debug(True) + def add_task_subscriber(self, subscriber, identifier=None): + return self._comm.add_task_subscriber(subscriber, identifier) - communicator = communications.LoopCommunicator(thread_communicator, loop=loop) + def remove_rpc_subscriber(self, identifier): + return self._comm.remove_rpc_subscriber(identifier) - yield communicator + def remove_broadcast_subscriber(self, identifier): + return self._comm.remove_broadcast_subscriber(identifier) - thread_communicator.close() + def remove_task_subscriber(self, identifier): + return self._comm.remove_task_subscriber(identifier) + + def rpc_send(self, recipient_id, msg): + return self._comm.rpc_send(recipient_id, msg) + + def broadcast_send( + self, + body, + sender=None, + subject=None, + correlation_id=None, + ): + return self._comm.broadcast_send(body, sender, subject, correlation_id) + + def task_send(self, task, no_reply=False): + return self._comm.task_send(task, no_reply) + + def close(self): + self._comm.close() @pytest.fixture -def async_controller(loop_communicator: communications.LoopCommunicator): - yield process_comms.RemoteProcessController(loop_communicator) +def _coordinator(): + coordinator = CoordinatorWithLoopRmqThreadCommunicator() + yield coordinator + coordinator.close() + + +@pytest.fixture +def async_controller(_coordinator): + yield process_control.RemoteProcessController(_coordinator) class TestLoopCommunicator: """Make sure the loop communicator is working as expected""" @pytest.mark.asyncio - async def test_broadcast(self, loop_communicator): + async def test_broadcast(self, _coordinator): BROADCAST = {'body': 'present', 'sender': 'Martin', 'subject': 'sup', 'correlation_id': 420} # noqa: N806 broadcast_future = asyncio.Future() @@ -75,14 +120,14 @@ def get_broadcast(_comm, body, sender, subject, correlation_id): {'body': body, 'sender': sender, 'subject': subject, 'correlation_id': correlation_id} ) - loop_communicator.add_broadcast_subscriber(get_broadcast) - loop_communicator.broadcast_send(**BROADCAST) + _coordinator.add_broadcast_subscriber(get_broadcast) + _coordinator.broadcast_send(**BROADCAST) result = await broadcast_future assert result == BROADCAST @pytest.mark.asyncio - async def test_broadcast_filter(self, loop_communicator: kiwipy.Communicator): + async def test_broadcast_filter(self, _coordinator: Coordinator): broadcast_future = asyncio.Future() def ignore_broadcast(_comm, body, sender, subject, correlation_id): @@ -91,17 +136,15 @@ def ignore_broadcast(_comm, body, sender, subject, correlation_id): def get_broadcast(_comm, body, sender, subject, correlation_id): broadcast_future.set_result(True) - loop_communicator.add_broadcast_subscriber(ignore_broadcast, subject_filter='other') - loop_communicator.add_broadcast_subscriber(get_broadcast) - loop_communicator.broadcast_send( - **{'body': 'present', 'sender': 'Martin', 'subject': 'sup', 'correlation_id': 420} - ) + _coordinator.add_broadcast_subscriber(ignore_broadcast, subject_filter='other') + _coordinator.add_broadcast_subscriber(get_broadcast) + _coordinator.broadcast_send(**{'body': 'present', 'sender': 'Martin', 'subject': 'sup', 'correlation_id': 420}) result = await broadcast_future assert result is True @pytest.mark.asyncio - async def test_rpc(self, loop_communicator): + async def test_rpc(self, _coordinator): MSG = 'rpc this' # noqa: N806 rpc_future = asyncio.Future() @@ -111,14 +154,14 @@ def get_rpc(_comm, msg): assert loop is asyncio.get_event_loop() rpc_future.set_result(msg) - loop_communicator.add_rpc_subscriber(get_rpc, 'rpc') - loop_communicator.rpc_send('rpc', MSG) + _coordinator.add_rpc_subscriber(get_rpc, 'rpc') + _coordinator.rpc_send('rpc', MSG) result = await rpc_future assert result == MSG @pytest.mark.asyncio - async def test_task(self, loop_communicator): + async def test_task(self, _coordinator): TASK = 'task this' # noqa: N806 task_future = asyncio.Future() @@ -128,8 +171,8 @@ def get_task(_comm, msg): assert loop is asyncio.get_event_loop() task_future.set_result(msg) - loop_communicator.add_task_subscriber(get_task) - loop_communicator.task_send(TASK) + _coordinator.add_task_subscriber(get_task) + _coordinator.task_send(TASK) result = await task_future assert result == TASK @@ -137,43 +180,43 @@ def get_task(_comm, msg): class TestTaskActions: @pytest.mark.asyncio - async def test_launch(self, loop_communicator, async_controller, persister): + async def test_launch(self, _coordinator, async_controller, persister): # Let the process run to the end loop = asyncio.get_event_loop() - loop_communicator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) + _coordinator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) result = await async_controller.launch_process(utils.DummyProcess) # Check that we got a result assert result == utils.DummyProcess.EXPECTED_OUTPUTS @pytest.mark.asyncio - async def test_launch_nowait(self, loop_communicator, async_controller, persister): + async def test_launch_nowait(self, _coordinator, async_controller, persister): """Testing launching but don't wait, just get the pid""" loop = asyncio.get_event_loop() - loop_communicator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) + _coordinator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) pid = await async_controller.launch_process(utils.DummyProcess, nowait=True) assert isinstance(pid, uuid.UUID) @pytest.mark.asyncio - async def test_execute_action(self, loop_communicator, async_controller, persister): + async def test_execute_action(self, _coordinator, async_controller, persister): """Test the process execute action""" loop = asyncio.get_event_loop() - loop_communicator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) + _coordinator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) result = await async_controller.execute_process(utils.DummyProcessWithOutput) assert utils.DummyProcessWithOutput.EXPECTED_OUTPUTS == result @pytest.mark.asyncio - async def test_execute_action_nowait(self, loop_communicator, async_controller, persister): + async def test_execute_action_nowait(self, _coordinator, async_controller, persister): """Test the process execute action""" loop = asyncio.get_event_loop() - loop_communicator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) + _coordinator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) pid = await async_controller.execute_process(utils.DummyProcessWithOutput, nowait=True) assert isinstance(pid, uuid.UUID) @pytest.mark.asyncio - async def test_launch_many(self, loop_communicator, async_controller, persister): + async def test_launch_many(self, _coordinator, async_controller, persister): """Test launching multiple processes""" loop = asyncio.get_event_loop() - loop_communicator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) + _coordinator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) num_to_launch = 10 launch_futures = [] @@ -186,10 +229,10 @@ async def test_launch_many(self, loop_communicator, async_controller, persister) assert isinstance(result, uuid.UUID) @pytest.mark.asyncio - async def test_continue(self, loop_communicator, async_controller, persister): + async def test_continue(self, _coordinator, async_controller, persister): """Test continuing a saved process""" loop = asyncio.get_event_loop() - loop_communicator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) + _coordinator.add_task_subscriber(plumpy.ProcessLauncher(loop, persister=persister)) process = utils.DummyProcessWithOutput() persister.save_checkpoint(process) pid = process.pid diff --git a/tests/rmq/test_process_comms.py b/tests/rmq/test_process_control.py similarity index 57% rename from tests/rmq/test_process_comms.py rename to tests/rmq/test_process_control.py index 9de211ee..de779d78 100644 --- a/tests/rmq/test_process_comms.py +++ b/tests/rmq/test_process_control.py @@ -7,45 +7,89 @@ from kiwipy import rmq import plumpy -from plumpy.message import KILL_MSG, MESSAGE_KEY -from plumpy.rmq import process_comms +from plumpy.rmq import process_control from .. import utils -@pytest.fixture -def thread_communicator(): - message_exchange = f'{__file__}.{shortuuid.uuid()}' - task_exchange = f'{__file__}.{shortuuid.uuid()}' - task_queue = f'{__file__}.{shortuuid.uuid()}' +class CoordinatorWithRmqThreadCommunicator: + def __init__(self): + message_exchange = f'{__file__}.{shortuuid.uuid()}' + task_exchange = f'{__file__}.{shortuuid.uuid()}' + task_queue = f'{__file__}.{shortuuid.uuid()}' + + self._comm = rmq.RmqThreadCommunicator.connect( + connection_params={'url': 'amqp://guest:guest@localhost:5672/'}, + message_exchange=message_exchange, + task_exchange=task_exchange, + task_queue=task_queue, + ) + self._comm._loop.set_debug(True) + + def add_rpc_subscriber(self, subscriber, identifier=None): + return self._comm.add_rpc_subscriber(subscriber, identifier) + + def add_broadcast_subscriber( + self, + subscriber, + subject_filter=None, + identifier=None, + ): + subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter) + return self._comm.add_broadcast_subscriber(subscriber, identifier) + + def add_task_subscriber(self, subscriber, identifier=None): + return self._comm.add_task_subscriber(subscriber, identifier) + + def remove_rpc_subscriber(self, identifier): + return self._comm.remove_rpc_subscriber(identifier) - communicator = rmq.RmqThreadCommunicator.connect( - connection_params={'url': 'amqp://guest:guest@localhost:5672/'}, - message_exchange=message_exchange, - task_exchange=task_exchange, - task_queue=task_queue, - ) - communicator._loop.set_debug(True) + def remove_broadcast_subscriber(self, identifier): + return self._comm.remove_broadcast_subscriber(identifier) - yield communicator + def remove_task_subscriber(self, identifier): + return self._comm.remove_task_subscriber(identifier) - communicator.close() + def rpc_send(self, recipient_id, msg): + return self._comm.rpc_send(recipient_id, msg) + + def broadcast_send( + self, + body, + sender=None, + subject=None, + correlation_id=None, + ): + return self._comm.broadcast_send(body, sender, subject, correlation_id) + + def task_send(self, task, no_reply=False): + return self._comm.task_send(task, no_reply) + + def close(self): + self._comm.close() + + +@pytest.fixture +def _coordinator(): + coordinator = CoordinatorWithRmqThreadCommunicator() + yield coordinator + coordinator.close() @pytest.fixture -def async_controller(thread_communicator: rmq.RmqThreadCommunicator): - yield process_comms.RemoteProcessController(thread_communicator) +def async_controller(_coordinator): + yield process_control.RemoteProcessController(_coordinator) @pytest.fixture -def sync_controller(thread_communicator: rmq.RmqThreadCommunicator): - yield process_comms.RemoteProcessThreadController(thread_communicator) +def sync_controller(_coordinator): + yield process_control.RemoteProcessThreadController(_coordinator) class TestRemoteProcessController: @pytest.mark.asyncio - async def test_pause(self, thread_communicator, async_controller): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + async def test_pause(self, _coordinator, async_controller): + proc = utils.WaitForSignalProcess(coordinator=_coordinator) # Run the process in the background asyncio.ensure_future(proc.step_until_terminated()) # Send a pause message @@ -56,8 +100,8 @@ async def test_pause(self, thread_communicator, async_controller): assert proc.paused @pytest.mark.asyncio - async def test_play(self, thread_communicator, async_controller): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + async def test_play(self, _coordinator, async_controller): + proc = utils.WaitForSignalProcess(coordinator=_coordinator) # Run the process in the background asyncio.ensure_future(proc.step_until_terminated()) assert proc.pause() @@ -74,8 +118,8 @@ async def test_play(self, thread_communicator, async_controller): await async_controller.kill_process(proc.pid) @pytest.mark.asyncio - async def test_kill(self, thread_communicator, async_controller): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + async def test_kill(self, _coordinator, async_controller): + proc = utils.WaitForSignalProcess(coordinator=_coordinator) # Run the process in the event loop asyncio.ensure_future(proc.step_until_terminated()) @@ -87,8 +131,8 @@ async def test_kill(self, thread_communicator, async_controller): assert proc.state == plumpy.ProcessState.KILLED @pytest.mark.asyncio - async def test_status(self, thread_communicator, async_controller): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + async def test_status(self, _coordinator, async_controller): + proc = utils.WaitForSignalProcess(coordinator=_coordinator) # Run the process in the background asyncio.ensure_future(proc.step_until_terminated()) @@ -100,15 +144,15 @@ async def test_status(self, thread_communicator, async_controller): # make sure proc reach the final state await async_controller.kill_process(proc.pid) - def test_broadcast(self, thread_communicator): + def test_broadcast(self, _coordinator): messages = [] def on_broadcast_receive(**msg): messages.append(msg) - thread_communicator.add_broadcast_subscriber(on_broadcast_receive) + _coordinator.add_broadcast_subscriber(on_broadcast_receive) - proc = utils.DummyProcess(coordinator=thread_communicator) + proc = utils.DummyProcess(coordinator=_coordinator) proc.execute() expected_subjects = [] @@ -122,8 +166,8 @@ def on_broadcast_receive(**msg): class TestRemoteProcessThreadController: @pytest.mark.asyncio - async def test_pause(self, thread_communicator, sync_controller): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + async def test_pause(self, _coordinator, sync_controller): + proc = utils.WaitForSignalProcess(coordinator=_coordinator) # Send a pause message pause_future = sync_controller.pause_process(proc.pid) @@ -136,22 +180,22 @@ async def test_pause(self, thread_communicator, sync_controller): assert proc.paused @pytest.mark.asyncio - async def test_pause_all(self, thread_communicator, sync_controller): + async def test_pause_all(self, _coordinator, sync_controller): """Test pausing all processes on a communicator""" procs = [] for _ in range(10): - procs.append(utils.WaitForSignalProcess(coordinator=thread_communicator)) + procs.append(utils.WaitForSignalProcess(coordinator=_coordinator)) sync_controller.pause_all("Slow yo' roll") # Wait until they are all paused await utils.wait_util(lambda: all([proc.paused for proc in procs])) @pytest.mark.asyncio - async def test_play_all(self, thread_communicator, sync_controller): + async def test_play_all(self, _coordinator, sync_controller): """Test pausing all processes on a communicator""" procs = [] for _ in range(10): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + proc = utils.WaitForSignalProcess(coordinator=_coordinator) procs.append(proc) proc.pause('hold tight') @@ -161,8 +205,8 @@ async def test_play_all(self, thread_communicator, sync_controller): await utils.wait_util(lambda: all([not proc.paused for proc in procs])) @pytest.mark.asyncio - async def test_play(self, thread_communicator, sync_controller): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + async def test_play(self, _coordinator, sync_controller): + proc = utils.WaitForSignalProcess(coordinator=_coordinator) assert proc.pause() # Send a play message @@ -175,8 +219,8 @@ async def test_play(self, thread_communicator, sync_controller): assert proc.state == plumpy.ProcessState.CREATED @pytest.mark.asyncio - async def test_kill(self, thread_communicator, sync_controller): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + async def test_kill(self, _coordinator, sync_controller): + proc = utils.WaitForSignalProcess(coordinator=_coordinator) # Send a kill message kill_future = sync_controller.kill_process(proc.pid) @@ -189,21 +233,21 @@ async def test_kill(self, thread_communicator, sync_controller): assert proc.state == plumpy.ProcessState.KILLED @pytest.mark.asyncio - async def test_kill_all(self, thread_communicator, sync_controller): + async def test_kill_all(self, _coordinator, sync_controller): """Test pausing all processes on a communicator""" procs = [] for _ in range(10): - procs.append(utils.WaitForSignalProcess(coordinator=thread_communicator)) + procs.append(utils.WaitForSignalProcess(coordinator=_coordinator)) - msg = process_comms.MessageBuilder.kill(text='bang bang, I shot you down') + msg = process_control.MessageBuilder.kill(text='bang bang, I shot you down') sync_controller.kill_all(msg) await utils.wait_util(lambda: all([proc.killed() for proc in procs])) assert all([proc.state == plumpy.ProcessState.KILLED for proc in procs]) @pytest.mark.asyncio - async def test_status(self, thread_communicator, sync_controller): - proc = utils.WaitForSignalProcess(coordinator=thread_communicator) + async def test_status(self, _coordinator, sync_controller): + proc = utils.WaitForSignalProcess(coordinator=_coordinator) # Run the process in the background asyncio.ensure_future(proc.step_until_terminated())