From 737adf31e52685e05e9d45ac8d2aa5c44999f6a8 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Mon, 2 Dec 2024 00:56:52 +0100 Subject: [PATCH] Pause/Play/Status all using message builder --- docs/source/tutorial.ipynb | 2 +- src/plumpy/process_comms.py | 61 +++++++++++++++++++++++++------------ 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/docs/source/tutorial.ipynb b/docs/source/tutorial.ipynb index c1fdb3b2..fe25892d 100644 --- a/docs/source/tutorial.ipynb +++ b/docs/source/tutorial.ipynb @@ -1118,7 +1118,7 @@ "\n", "process = SimpleProcess(communicator=communicator)\n", "\n", - "pprint(communicator.rpc_send(str(process.pid), plumpy.STATUS_MSG).result())" + "pprint(communicator.rpc_send(str(process.pid), plumpy.StatusMessage.build()).result())" ] }, { diff --git a/src/plumpy/process_comms.py b/src/plumpy/process_comms.py index bc2fa125..39b70d4f 100644 --- a/src/plumpy/process_comms.py +++ b/src/plumpy/process_comms.py @@ -2,7 +2,6 @@ """Module for process level communication functions and classes""" import asyncio -import copy import logging from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast @@ -12,13 +11,13 @@ from .utils import PID_TYPE __all__ = [ - 'PAUSE_MSG', - 'PLAY_MSG', - 'STATUS_MSG', 'KillMessage', + 'PauseMessage', + 'PlayMessage', 'ProcessLauncher', 'RemoteProcessController', 'RemoteProcessThreadController', + 'StatusMessage', 'create_continue_body', 'create_launch_body', ] @@ -45,10 +44,27 @@ class Intent: MessageType = dict[str, Any] -PAUSE_MSG: MessageType = {INTENT_KEY: Intent.PAUSE, MESSAGE_KEY: None} -PLAY_MSG: MessageType = {INTENT_KEY: Intent.PLAY, MESSAGE_KEY: None} -# KILL_MSG: MessageType = {INTENT_KEY: Intent.KILL, MESSAGE_KEY: None, FORCE_KILL_KEY: False} -STATUS_MSG: MessageType = {INTENT_KEY: Intent.STATUS, MESSAGE_KEY: None} +# PAUSE_MSG: MessageType = {INTENT_KEY: Intent.PAUSE, MESSAGE_KEY: None} +# PLAY_MSG: MessageType = {INTENT_KEY: Intent.PLAY, MESSAGE_KEY: None} +# STATUS_MSG: MessageType = {INTENT_KEY: Intent.STATUS, MESSAGE_KEY: None} + + +class PlayMessage: + @classmethod + def build(cls, message: str | None = None) -> MessageType: + return { + INTENT_KEY: Intent.PLAY, + MESSAGE_KEY: message, + } + + +class PauseMessage: + @classmethod + def build(cls, message: str | None = None) -> MessageType: + return { + INTENT_KEY: Intent.PAUSE, + MESSAGE_KEY: message, + } class KillMessage: @@ -61,6 +77,15 @@ def build(cls, message: str | None = None, force: bool = False) -> MessageType: } +class StatusMessage: + @classmethod + def build(cls, message: str | None = None) -> MessageType: + return { + INTENT_KEY: Intent.STATUS, + MESSAGE_KEY: message, + } + + TASK_KEY = 'task' TASK_ARGS = 'args' PERSIST_KEY = 'persist' @@ -176,7 +201,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus': :param pid: the process id :return: the status response from the process """ - future = self._communicator.rpc_send(pid, STATUS_MSG) + future = self._communicator.rpc_send(pid, StatusMessage.build()) result = await asyncio.wrap_future(future) return result @@ -188,11 +213,9 @@ async def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'Pr :param msg: optional pause message :return: True if paused, False otherwise """ - message = copy.copy(PAUSE_MSG) - if msg is not None: - message[MESSAGE_KEY] = msg + msg = PauseMessage.build(message=msg) - pause_future = self._communicator.rpc_send(pid, message) + pause_future = self._communicator.rpc_send(pid, msg) # rpc_send return a thread future from communicator future = await asyncio.wrap_future(pause_future) # future is just returned from rpc call which return a kiwipy future @@ -206,7 +229,7 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult': :param pid: the pid of the process to play :return: True if played, False otherwise """ - play_future = self._communicator.rpc_send(pid, PLAY_MSG) + play_future = self._communicator.rpc_send(pid, PlayMessage.build()) future = await asyncio.wrap_future(play_future) result = await asyncio.wrap_future(future) return result @@ -344,7 +367,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future: :param pid: the process id :return: the status response from the process """ - return self._communicator.rpc_send(pid, STATUS_MSG) + return self._communicator.rpc_send(pid, StatusMessage.build()) def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future: """ @@ -355,11 +378,9 @@ def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fu :return: a response future from the process to be paused """ - message = copy.copy(PAUSE_MSG) - if msg is not None: - message[MESSAGE_KEY] = msg + msg = PauseMessage.build(message=msg) - return self._communicator.rpc_send(pid, message) + return self._communicator.rpc_send(pid, msg) def pause_all(self, msg: Any) -> None: """ @@ -377,7 +398,7 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future: :return: a response future from the process to be played """ - return self._communicator.rpc_send(pid, PLAY_MSG) + return self._communicator.rpc_send(pid, PlayMessage.build()) def play_all(self) -> None: """