From 08e2c02dab01ed0d56bdc0eaf085b9e03d7a87c7 Mon Sep 17 00:00:00 2001
From: Jusong Yu <jusong.yeu@gmail.com>
Date: Wed, 11 Dec 2024 18:49:44 +0100
Subject: [PATCH] Ali's api for message constructor

---
 docs/source/tutorial.ipynb      |  2 +-
 src/plumpy/process_comms.py     | 59 ++++++++++++++-------------------
 src/plumpy/process_states.py    |  4 +--
 src/plumpy/processes.py         |  4 +--
 tests/rmq/test_process_comms.py |  2 +-
 tests/test_processes.py         |  6 ++--
 tests/utils.py                  |  4 +--
 7 files changed, 35 insertions(+), 46 deletions(-)

diff --git a/docs/source/tutorial.ipynb b/docs/source/tutorial.ipynb
index af1ed795..f9057ee7 100644
--- a/docs/source/tutorial.ipynb
+++ b/docs/source/tutorial.ipynb
@@ -281,7 +281,7 @@
     "    def continue_fn(self):\n",
     "        print('continuing')\n",
     "        # message is stored in the process status\n",
-    "        return plumpy.Kill(plumpy.KillMessage.build('I was killed'))\n",
+    "        return plumpy.Kill(plumpy.MessageBuilder.kill('I was killed'))\n",
     "\n",
     "\n",
     "process = ContinueProcess()\n",
diff --git a/src/plumpy/process_comms.py b/src/plumpy/process_comms.py
index 9558b2db..e615ee4a 100644
--- a/src/plumpy/process_comms.py
+++ b/src/plumpy/process_comms.py
@@ -13,13 +13,10 @@
 from .utils import PID_TYPE
 
 __all__ = [
-    'KillMessage',
-    'PauseMessage',
-    'PlayMessage',
+    'MessageBuilder',
     'ProcessLauncher',
     'RemoteProcessController',
     'RemoteProcessThreadController',
-    'StatusMessage',
     'create_continue_body',
     'create_launch_body',
 ]
@@ -47,48 +44,40 @@ class Intent:
 MessageType = Dict[str, Any]
 
 
-class PlayMessage:
-    """The play message send over communicator."""
+class MessageBuilder:
+    """MessageBuilder will construct different messages that can passing over communicator."""
 
     @classmethod
-    def build(cls, message: str | None = None) -> MessageType:
+    def play(cls, text: str | None = None) -> MessageType:
+        """The play message send over communicator."""
         return {
             INTENT_KEY: Intent.PLAY,
-            MESSAGE_KEY: message,
+            MESSAGE_KEY: text,
         }
 
-
-class PauseMessage:
-    """The pause message send over communicator."""
-
     @classmethod
-    def build(cls, message: str | None = None) -> MessageType:
+    def pause(cls, text: str | None = None) -> MessageType:
+        """The pause message send over communicator."""
         return {
             INTENT_KEY: Intent.PAUSE,
-            MESSAGE_KEY: message,
+            MESSAGE_KEY: text,
         }
 
-
-class KillMessage:
-    """The kill message send over communicator."""
-
     @classmethod
-    def build(cls, message: str | None = None, force_kill: bool = False) -> MessageType:
+    def kill(cls, text: str | None = None, force_kill: bool = False) -> MessageType:
+        """The kill message send over communicator."""
         return {
             INTENT_KEY: Intent.KILL,
-            MESSAGE_KEY: message,
+            MESSAGE_KEY: text,
             FORCE_KILL_KEY: force_kill,
         }
 
-
-class StatusMessage:
-    """The status message send over communicator."""
-
     @classmethod
-    def build(cls, message: str | None = None) -> MessageType:
+    def status(cls, text: str | None = None) -> MessageType:
+        """The status message send over communicator."""
         return {
             INTENT_KEY: Intent.STATUS,
-            MESSAGE_KEY: message,
+            MESSAGE_KEY: text,
         }
 
 
@@ -207,7 +196,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus':
         :param pid: the process id
         :return: the status response from the process
         """
-        future = self._communicator.rpc_send(pid, StatusMessage.build())
+        future = self._communicator.rpc_send(pid, MessageBuilder.status())
         result = await asyncio.wrap_future(future)
         return result
 
@@ -219,7 +208,7 @@ async def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'Pr
         :param msg: optional pause message
         :return: True if paused, False otherwise
         """
-        msg = PauseMessage.build(message=msg)
+        msg = MessageBuilder.pause(text=msg)
 
         pause_future = self._communicator.rpc_send(pid, msg)
         # rpc_send return a thread future from communicator
@@ -235,7 +224,7 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult':
         :param pid: the pid of the process to play
         :return: True if played, False otherwise
         """
-        play_future = self._communicator.rpc_send(pid, PlayMessage.build())
+        play_future = self._communicator.rpc_send(pid, MessageBuilder.play())
         future = await asyncio.wrap_future(play_future)
         result = await asyncio.wrap_future(future)
         return result
@@ -249,7 +238,7 @@ async def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None)
         :return: True if killed, False otherwise
         """
         if msg is None:
-            msg = KillMessage.build()
+            msg = MessageBuilder.kill()
 
         # Wait for the communication to go through
         kill_future = self._communicator.rpc_send(pid, msg)
@@ -373,7 +362,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future:
         :param pid: the process id
         :return: the status response from the process
         """
-        return self._communicator.rpc_send(pid, StatusMessage.build())
+        return self._communicator.rpc_send(pid, MessageBuilder.status())
 
     def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future:
         """
@@ -384,7 +373,7 @@ def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fu
         :return: a response future from the process to be paused
 
         """
-        msg = PauseMessage.build(message=msg)
+        msg = MessageBuilder.pause(text=msg)
 
         return self._communicator.rpc_send(pid, msg)
 
@@ -404,7 +393,7 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future:
         :return: a response future from the process to be played
 
         """
-        return self._communicator.rpc_send(pid, PlayMessage.build())
+        return self._communicator.rpc_send(pid, MessageBuilder.play())
 
     def play_all(self) -> None:
         """
@@ -422,7 +411,7 @@ def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) -> ki
 
         """
         if msg is None:
-            msg = KillMessage.build()
+            msg = MessageBuilder.kill()
 
         return self._communicator.rpc_send(pid, msg)
 
@@ -433,7 +422,7 @@ def kill_all(self, msg: Optional[MessageType]) -> None:
         :param msg: an optional pause message
         """
         if msg is None:
-            msg = KillMessage.build()
+            msg = MessageBuilder.kill()
 
         self._communicator.broadcast_send(msg, subject=Intent.KILL)
 
diff --git a/src/plumpy/process_states.py b/src/plumpy/process_states.py
index 2e311184..d369a1e9 100644
--- a/src/plumpy/process_states.py
+++ b/src/plumpy/process_states.py
@@ -10,7 +10,7 @@
 import yaml
 from yaml.loader import Loader
 
-from plumpy.process_comms import KillMessage, MessageType
+from plumpy.process_comms import MessageBuilder, MessageType
 
 try:
     import tblib
@@ -55,7 +55,7 @@ class KillInterruption(Interruption):
     def __init__(self, msg: MessageType | None):
         super().__init__()
         if msg is None:
-            msg = KillMessage.build()
+            msg = MessageBuilder.kill()
 
         self.msg: MessageType = msg
 
diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py
index f846c052..0866ee41 100644
--- a/src/plumpy/processes.py
+++ b/src/plumpy/processes.py
@@ -54,7 +54,7 @@
 from .base.state_machine import StateEntryFailed, StateMachine, TransitionFailed, event
 from .base.utils import call_with_super_check, super_check
 from .event_helper import EventHelper
-from .process_comms import MESSAGE_KEY, KillMessage, MessageType
+from .process_comms import MESSAGE_KEY, MessageBuilder, MessageType
 from .process_listener import ProcessListener
 from .process_spec import ProcessSpec
 from .utils import PID_TYPE, SAVED_STATE_TYPE, protected
@@ -344,7 +344,7 @@ def init(self) -> None:
 
             def try_killing(future: futures.Future) -> None:
                 if future.cancelled():
-                    msg = KillMessage.build(message='Killed by future being cancelled')
+                    msg = MessageBuilder.kill(text='Killed by future being cancelled')
                     if not self.kill(msg):
                         self.logger.warning(
                             'Process<%s>: Failed to kill process on future cancel',
diff --git a/tests/rmq/test_process_comms.py b/tests/rmq/test_process_comms.py
index c6826a24..1a9051d1 100644
--- a/tests/rmq/test_process_comms.py
+++ b/tests/rmq/test_process_comms.py
@@ -195,7 +195,7 @@ async def test_kill_all(self, thread_communicator, sync_controller):
         for _ in range(10):
             procs.append(utils.WaitForSignalProcess(communicator=thread_communicator))
 
-        msg = process_comms.KillMessage.build(message='bang bang, I shot you down')
+        msg = process_comms.MessageBuilder.kill(message='bang bang, I shot you down')
 
         sync_controller.kill_all(msg)
         await utils.wait_util(lambda: all([proc.killed() for proc in procs]))
diff --git a/tests/test_processes.py b/tests/test_processes.py
index eb5bf599..f15816c8 100644
--- a/tests/test_processes.py
+++ b/tests/test_processes.py
@@ -10,7 +10,7 @@
 
 import plumpy
 from plumpy import BundleKeys, Process, ProcessState
-from plumpy.process_comms import KillMessage
+from plumpy.process_comms import MessageBuilder
 from plumpy.utils import AttributesFrozendict
 from tests import utils
 
@@ -322,7 +322,7 @@ def run(self, **kwargs):
     def test_kill(self):
         proc: Process = utils.DummyProcess()
 
-        msg = KillMessage.build(message='Farewell!')
+        msg = MessageBuilder.kill(message='Farewell!')
         proc.kill(msg)
         self.assertTrue(proc.killed())
         self.assertEqual(proc.killed_msg(), msg)
@@ -428,7 +428,7 @@ class KillProcess(Process):
             after_kill = False
 
             def run(self, **kwargs):
-                msg = KillMessage.build(message='killed')
+                msg = MessageBuilder.kill(message='killed')
                 self.kill(msg)
                 # The following line should be executed because kill will not
                 # interrupt execution of a method call in the RUNNING state
diff --git a/tests/utils.py b/tests/utils.py
index 88638e01..05f5ce6f 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -8,7 +8,7 @@
 
 import plumpy
 from plumpy import persistence, process_states, processes, utils
-from plumpy.process_comms import KillMessage
+from plumpy.process_comms import  MessageBuilder
 
 Snapshot = collections.namedtuple('Snapshot', ['state', 'bundle', 'outputs'])
 
@@ -85,7 +85,7 @@ def last_step(self):
 class KillProcess(processes.Process):
     @utils.override
     def run(self):
-        msg = KillMessage.build(message='killed')
+        msg = MessageBuilder.kill(message='killed')
         return process_states.Kill(msg=msg)