Skip to content

Commit

Permalink
Pause/Play/Status all using message builder
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 2, 2024
1 parent 4a099b0 commit 1b2864c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 21 deletions.
2 changes: 1 addition & 1 deletion docs/source/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@
"\n",
"process = SimpleProcess(communicator=communicator)\n",
"\n",
"pprint(communicator.rpc_send(str(process.pid), plumpy.STATUS_MSG).result())"
"pprint(communicator.rpc_send(str(process.pid), plumpy.StatusMessage.build()).result())"
]
},
{
Expand Down
61 changes: 41 additions & 20 deletions src/plumpy/process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"""Module for process level communication functions and classes"""

import asyncio
import copy
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast

Expand All @@ -12,13 +11,13 @@
from .utils import PID_TYPE

__all__ = [
'PAUSE_MSG',
'PLAY_MSG',
'STATUS_MSG',
'KillMessage',
'PauseMessage',
'PlayMessage',
'ProcessLauncher',
'RemoteProcessController',
'RemoteProcessThreadController',
'StatusMessage',
'create_continue_body',
'create_launch_body',
]
Expand All @@ -45,10 +44,27 @@ class Intent:

MessageType = dict[str, Any]

PAUSE_MSG: MessageType = {INTENT_KEY: Intent.PAUSE, MESSAGE_KEY: None}
PLAY_MSG: MessageType = {INTENT_KEY: Intent.PLAY, MESSAGE_KEY: None}
# KILL_MSG: MessageType = {INTENT_KEY: Intent.KILL, MESSAGE_KEY: None, FORCE_KILL_KEY: False}
STATUS_MSG: MessageType = {INTENT_KEY: Intent.STATUS, MESSAGE_KEY: None}
# PAUSE_MSG: MessageType = {INTENT_KEY: Intent.PAUSE, MESSAGE_KEY: None}
# PLAY_MSG: MessageType = {INTENT_KEY: Intent.PLAY, MESSAGE_KEY: None}
# STATUS_MSG: MessageType = {INTENT_KEY: Intent.STATUS, MESSAGE_KEY: None}


class PlayMessage:
@classmethod
def build(cls, message: str | None = None) -> MessageType:
return {
INTENT_KEY: Intent.PLAY,
MESSAGE_KEY: message,
}


class PauseMessage:
@classmethod
def build(cls, message: str | None = None) -> MessageType:
return {
INTENT_KEY: Intent.PAUSE,
MESSAGE_KEY: message,
}


class KillMessage:
Expand All @@ -61,6 +77,15 @@ def build(cls, message: str | None = None, force: bool = False) -> MessageType:
}


class StatusMessage:
@classmethod
def build(cls, message: str | None = None) -> MessageType:
return {
INTENT_KEY: Intent.STATUS,
MESSAGE_KEY: message,
}


TASK_KEY = 'task'
TASK_ARGS = 'args'
PERSIST_KEY = 'persist'
Expand Down Expand Up @@ -176,7 +201,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus':
:param pid: the process id
:return: the status response from the process
"""
future = self._communicator.rpc_send(pid, STATUS_MSG)
future = self._communicator.rpc_send(pid, StatusMessage.build())
result = await asyncio.wrap_future(future)
return result

Expand All @@ -188,11 +213,9 @@ async def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'Pr
:param msg: optional pause message
:return: True if paused, False otherwise
"""
message = copy.copy(PAUSE_MSG)
if msg is not None:
message[MESSAGE_KEY] = msg
msg = PauseMessage.build(message=msg)

pause_future = self._communicator.rpc_send(pid, message)
pause_future = self._communicator.rpc_send(pid, msg)
# rpc_send return a thread future from communicator
future = await asyncio.wrap_future(pause_future)
# future is just returned from rpc call which return a kiwipy future
Expand All @@ -206,7 +229,7 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult':
:param pid: the pid of the process to play
:return: True if played, False otherwise
"""
play_future = self._communicator.rpc_send(pid, PLAY_MSG)
play_future = self._communicator.rpc_send(pid, PlayMessage.build())
future = await asyncio.wrap_future(play_future)
result = await asyncio.wrap_future(future)
return result
Expand Down Expand Up @@ -344,7 +367,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future:
:param pid: the process id
:return: the status response from the process
"""
return self._communicator.rpc_send(pid, STATUS_MSG)
return self._communicator.rpc_send(pid, StatusMessage.build())

def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future:
"""
Expand All @@ -355,11 +378,9 @@ def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fu
:return: a response future from the process to be paused
"""
message = copy.copy(PAUSE_MSG)
if msg is not None:
message[MESSAGE_KEY] = msg
msg = PauseMessage.build(message=msg)

return self._communicator.rpc_send(pid, message)
return self._communicator.rpc_send(pid, msg)

def pause_all(self, msg: Any) -> None:
"""
Expand All @@ -377,7 +398,7 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future:
:return: a response future from the process to be played
"""
return self._communicator.rpc_send(pid, PLAY_MSG)
return self._communicator.rpc_send(pid, PlayMessage.build())

def play_all(self) -> None:
"""
Expand Down

0 comments on commit 1b2864c

Please sign in to comment.