-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Message passing with more information #291
base: master
Are you sure you want to change the base?
Changes from 14 commits
1117eeb
b82791d
d4c0489
c5a195c
8db6675
74d048d
667af7a
4be6931
88259d6
c3c9db4
d0e4e73
e3c2ae8
e5c74ad
17a541a
e82824b
80b3458
08e2c02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,8 +1,9 @@ | ||||||
# -*- coding: utf-8 -*- | ||||||
"""Module for process level communication functions and classes""" | ||||||
|
||||||
from __future__ import annotations | ||||||
|
||||||
import asyncio | ||||||
import copy | ||||||
import logging | ||||||
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast | ||||||
|
||||||
|
@@ -12,13 +13,13 @@ | |||||
from .utils import PID_TYPE | ||||||
|
||||||
__all__ = [ | ||||||
'KILL_MSG', | ||||||
'PAUSE_MSG', | ||||||
'PLAY_MSG', | ||||||
'STATUS_MSG', | ||||||
'KillMessage', | ||||||
'PauseMessage', | ||||||
'PlayMessage', | ||||||
'ProcessLauncher', | ||||||
'RemoteProcessController', | ||||||
'RemoteProcessThreadController', | ||||||
'StatusMessage', | ||||||
'create_continue_body', | ||||||
'create_launch_body', | ||||||
] | ||||||
|
@@ -31,6 +32,7 @@ | |||||
|
||||||
INTENT_KEY = 'intent' | ||||||
MESSAGE_KEY = 'message' | ||||||
FORCE_KILL_KEY = 'force_kill' | ||||||
|
||||||
|
||||||
class Intent: | ||||||
|
@@ -42,10 +44,45 @@ class Intent: | |||||
STATUS: str = 'status' | ||||||
|
||||||
|
||||||
PAUSE_MSG = {INTENT_KEY: Intent.PAUSE} | ||||||
PLAY_MSG = {INTENT_KEY: Intent.PLAY} | ||||||
KILL_MSG = {INTENT_KEY: Intent.KILL} | ||||||
STATUS_MSG = {INTENT_KEY: Intent.STATUS} | ||||||
MessageType = Dict[str, Any] | ||||||
|
||||||
|
||||||
class PlayMessage: | ||||||
@classmethod | ||||||
def build(cls, message: str | None = None) -> MessageType: | ||||||
return { | ||||||
INTENT_KEY: Intent.PLAY, | ||||||
MESSAGE_KEY: message, | ||||||
} | ||||||
|
||||||
|
||||||
class PauseMessage: | ||||||
@classmethod | ||||||
def build(cls, message: str | None = None) -> MessageType: | ||||||
return { | ||||||
INTENT_KEY: Intent.PAUSE, | ||||||
MESSAGE_KEY: message, | ||||||
} | ||||||
|
||||||
|
||||||
class KillMessage: | ||||||
@classmethod | ||||||
def build(cls, message: str | None = None, force: bool = False) -> MessageType: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd still call it
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @unkcpz , maybe you forgot to update this 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. message = 'Killed through `verdi process kill`'
control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) For this one, the message should be build first in to a
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the one in _perform_actions(processes, controller.kill_process, 'kill', 'killing', timeout, wait, msg=message) change it to message = KillMessage.build(message=message, force=force)
_perform_actions(processes, functool.partial(controller.kill_process, msg=message), 'kill', 'killing', timeout, wait) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but now message = {'msg' : 'Killed through `verdi process kill`',
'options' : {
'force_kill':True
}}
control.kill_processes(processes, controller.kill_process, .., msg=message) and then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why it is a problem for importing
It is a dictionary returned by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well this is the twisted things that we were always complaining about, import from plumpy to only make a dictionary only for an input of a plumpy function itself... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My 2¢'s: I think it is rarely preferable to have a free-form dictionary to specify inputs to an API, especially if it is nested, because you will always have to start reading the docs/code to know how it is structured. Using the other approach, (data) classes, explicit function/method arguments etc. these can be auto inspected by an IDE and are much easier to use. The fact that you have to import something is in my eyes not really a problem. Anyway to use a library you have to import something to do anything. Now in this case, ideally the from dataclass import dataclass
@dataclass
class KillMessage:
message: str | None = None
force_kill: bool = False
@classmethod
def build(...):
.... Actually, if using a dataclass, why not just use the constructor to build the instance instead of using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I won't say "only", encapsulate it in a function as a contract also serve as an input validator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry @sphuber, I just saw your comment, I had my browser open for during the weekend and I directly reply to Ali's comment without refresh. Yes, thought about dataclass, and I think it can work. I use I think msgpack will take care of the dataclass for sure, but since the en/decoder used was the yaml one, I am not so sure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same here 😄 I had the page open without refreshing, so I didn't notice Sebastiaan's comment.
I see and agree with this suggestion, ✔️ |
||||||
return { | ||||||
INTENT_KEY: Intent.KILL, | ||||||
MESSAGE_KEY: message, | ||||||
FORCE_KILL_KEY: force, | ||||||
} | ||||||
|
||||||
|
||||||
class StatusMessage: | ||||||
@classmethod | ||||||
def build(cls, message: str | None = None) -> MessageType: | ||||||
return { | ||||||
INTENT_KEY: Intent.STATUS, | ||||||
MESSAGE_KEY: message, | ||||||
} | ||||||
|
||||||
unkcpz marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
TASK_KEY = 'task' | ||||||
TASK_ARGS = 'args' | ||||||
|
@@ -162,7 +199,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus': | |||||
:param pid: the process id | ||||||
:return: the status response from the process | ||||||
""" | ||||||
future = self._communicator.rpc_send(pid, STATUS_MSG) | ||||||
future = self._communicator.rpc_send(pid, StatusMessage.build()) | ||||||
result = await asyncio.wrap_future(future) | ||||||
return result | ||||||
|
||||||
|
@@ -174,11 +211,9 @@ async def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'Pr | |||||
:param msg: optional pause message | ||||||
:return: True if paused, False otherwise | ||||||
""" | ||||||
message = copy.copy(PAUSE_MSG) | ||||||
if msg is not None: | ||||||
message[MESSAGE_KEY] = msg | ||||||
msg = PauseMessage.build(message=msg) | ||||||
|
||||||
pause_future = self._communicator.rpc_send(pid, message) | ||||||
pause_future = self._communicator.rpc_send(pid, msg) | ||||||
# rpc_send return a thread future from communicator | ||||||
future = await asyncio.wrap_future(pause_future) | ||||||
# future is just returned from rpc call which return a kiwipy future | ||||||
|
@@ -192,25 +227,24 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult': | |||||
:param pid: the pid of the process to play | ||||||
:return: True if played, False otherwise | ||||||
""" | ||||||
play_future = self._communicator.rpc_send(pid, PLAY_MSG) | ||||||
play_future = self._communicator.rpc_send(pid, PlayMessage.build()) | ||||||
future = await asyncio.wrap_future(play_future) | ||||||
result = await asyncio.wrap_future(future) | ||||||
return result | ||||||
|
||||||
async def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'ProcessResult': | ||||||
async def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) -> 'ProcessResult': | ||||||
""" | ||||||
Kill the process | ||||||
|
||||||
:param pid: the pid of the process to kill | ||||||
:param msg: optional kill message | ||||||
:return: True if killed, False otherwise | ||||||
""" | ||||||
message = copy.copy(KILL_MSG) | ||||||
if msg is not None: | ||||||
message[MESSAGE_KEY] = msg | ||||||
if msg is None: | ||||||
msg = KillMessage.build() | ||||||
|
||||||
# Wait for the communication to go through | ||||||
kill_future = self._communicator.rpc_send(pid, message) | ||||||
kill_future = self._communicator.rpc_send(pid, msg) | ||||||
future = await asyncio.wrap_future(kill_future) | ||||||
# Now wait for the kill to be enacted | ||||||
result = await asyncio.wrap_future(future) | ||||||
|
@@ -331,7 +365,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future: | |||||
:param pid: the process id | ||||||
:return: the status response from the process | ||||||
""" | ||||||
return self._communicator.rpc_send(pid, STATUS_MSG) | ||||||
return self._communicator.rpc_send(pid, StatusMessage.build()) | ||||||
|
||||||
def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future: | ||||||
""" | ||||||
|
@@ -342,11 +376,9 @@ def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fu | |||||
:return: a response future from the process to be paused | ||||||
|
||||||
""" | ||||||
message = copy.copy(PAUSE_MSG) | ||||||
if msg is not None: | ||||||
message[MESSAGE_KEY] = msg | ||||||
msg = PauseMessage.build(message=msg) | ||||||
|
||||||
return self._communicator.rpc_send(pid, message) | ||||||
return self._communicator.rpc_send(pid, msg) | ||||||
|
||||||
def pause_all(self, msg: Any) -> None: | ||||||
""" | ||||||
|
@@ -364,15 +396,15 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future: | |||||
:return: a response future from the process to be played | ||||||
|
||||||
""" | ||||||
return self._communicator.rpc_send(pid, PLAY_MSG) | ||||||
return self._communicator.rpc_send(pid, PlayMessage.build()) | ||||||
|
||||||
def play_all(self) -> None: | ||||||
""" | ||||||
Play all processes that are subscribed to the same communicator | ||||||
""" | ||||||
self._communicator.broadcast_send(None, subject=Intent.PLAY) | ||||||
|
||||||
def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future: | ||||||
def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) -> kiwipy.Future: | ||||||
""" | ||||||
Kill the process | ||||||
|
||||||
|
@@ -381,18 +413,20 @@ def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fut | |||||
:return: a response future from the process to be killed | ||||||
|
||||||
""" | ||||||
message = copy.copy(KILL_MSG) | ||||||
if msg is not None: | ||||||
message[MESSAGE_KEY] = msg | ||||||
if msg is None: | ||||||
msg = KillMessage.build() | ||||||
|
||||||
return self._communicator.rpc_send(pid, message) | ||||||
return self._communicator.rpc_send(pid, msg) | ||||||
|
||||||
def kill_all(self, msg: Optional[Any]) -> None: | ||||||
def kill_all(self, msg: Optional[MessageType]) -> None: | ||||||
""" | ||||||
Kill all processes that are subscribed to the same communicator | ||||||
|
||||||
:param msg: an optional pause message | ||||||
""" | ||||||
if msg is None: | ||||||
msg = KillMessage.build() | ||||||
|
||||||
self._communicator.broadcast_send(msg, subject=Intent.KILL) | ||||||
|
||||||
def continue_process( | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this change and what does it mean?
also please note the default value
None
is taken away, is this backward compatible?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function inside plumpy is only called:
I did the same to create the state first and then pass to the function for consistency as
transition_to
.It depends, I think we don't have a clear public API list for plumpy. Since aiida-core didn't use this directly, it does not break backward compatibility of
aiida-core
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, agreed, I'm also not so strict in this case, since so far
aiida-core
is the only known dependent of thisplumpy