Skip to content

Commit

Permalink
Rename rmq/process_comm -> rmq/process_control
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 17, 2024
1 parent 18de42f commit d8ebd3c
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/source/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ WorkChains support the use of logical constructs such as `If_` and `While_` to c

A `Controller` can control processes throughout their lifetime, by sending and receiving messages. It can launch, pause, continue, kill and check status of the process.

The {py:class}`~plumpy.process_comms.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.
The {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.

The thread communicator runs on a independent thread (event loop) and so will not be blocked by sometimes long waiting times in the process event loop.
Using RabbitMQ means that even if the computer is terminated unexpectedly, messages are persisted and can be run once the computer restarts.
2 changes: 1 addition & 1 deletion docs/source/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"The {py:class}`~plumpy.workchains.WorkChain`\n",
": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n",
"\n",
"The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n",
"The process `Controller` (principally the {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController`)\n",
": To control the process or workchain throughout its lifetime."
]
},
Expand Down
4 changes: 2 additions & 2 deletions src/plumpy/rmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
from .communications import *
from .exceptions import *
from .futures import *
from .process_comms import *
from .process_control import *

__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_comms.__all__
__all__ = exceptions.__all__ + communications.__all__ + futures.__all__ + process_control.__all__
File renamed without changes.
4 changes: 2 additions & 2 deletions tests/rmq/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from kiwipy import rmq

import plumpy
from plumpy.rmq import communications, process_comms
from plumpy.rmq import communications, process_control

from .. import utils

Expand Down Expand Up @@ -55,7 +55,7 @@ def loop_communicator():

@pytest.fixture
def async_controller(loop_communicator: communications.LoopCommunicator):
yield process_comms.RemoteProcessController(loop_communicator)
yield process_control.RemoteProcessController(loop_communicator)


class TestLoopCommunicator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kiwipy import rmq

import plumpy
from plumpy.rmq import process_comms
from plumpy.rmq import process_control

from .. import utils

Expand Down Expand Up @@ -79,12 +79,12 @@ def _coordinator():

@pytest.fixture
def async_controller(_coordinator):
yield process_comms.RemoteProcessController(_coordinator)
yield process_control.RemoteProcessController(_coordinator)


@pytest.fixture
def sync_controller(_coordinator):
yield process_comms.RemoteProcessThreadController(_coordinator)
yield process_control.RemoteProcessThreadController(_coordinator)


class TestRemoteProcessController:
Expand Down Expand Up @@ -240,7 +240,7 @@ async def test_kill_all(self, _coordinator, sync_controller):
for _ in range(10):
procs.append(utils.WaitForSignalProcess(coordinator=_coordinator))

msg = process_comms.MessageBuilder.kill(text='bang bang, I shot you down')
msg = process_control.MessageBuilder.kill(text='bang bang, I shot you down')

sync_controller.kill_all(msg)
await utils.wait_util(lambda: all([proc.killed() for proc in procs]))
Expand Down

0 comments on commit d8ebd3c

Please sign in to comment.