Skip to content

Commit

Permalink
Add support for interruptible ROS processes (#129)
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <[email protected]>
  • Loading branch information
mhidalgo-bdai authored Nov 19, 2024
1 parent 5258a5a commit 02ca14e
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 18 deletions.
1 change: 1 addition & 0 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import weakref

import rclpy.executors
import rclpy.node

from bdai_ros2_wrappers.futures import FutureLike
from bdai_ros2_wrappers.utilities import bind_to_thread, fqn
Expand Down
87 changes: 76 additions & 11 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import inspect
import os
import signal
import sys
import threading
import typing
Expand All @@ -12,6 +13,7 @@
import rclpy.executors
import rclpy.logging
import rclpy.node
import rclpy.utilities
from rclpy.exceptions import InvalidNamespaceException, InvalidNodeNameException
from rclpy.validate_namespace import validate_namespace
from rclpy.validate_node_name import validate_node_name
Expand Down Expand Up @@ -50,6 +52,7 @@ def __init__(
func: MainCallable,
*,
uses_tf: bool = False,
interruptible: bool = False,
prebaked: typing.Union[bool, str] = True,
autospin: typing.Optional[bool] = None,
forward_logging: typing.Optional[bool] = None,
Expand All @@ -71,6 +74,9 @@ def __init__(
autospin: whether to automatically equip the underlying scope with a background executor
or not. Defaults to True for prebaked processes and to False for bare processes.
uses_tf: whether to instantiate a tf listener bound to the process main node. Defaults to False.
interruptible: whether the process allows graceful interruptions i.e. SIGINT (Ctrl+C) or SIGTERM
signaling. An interruptible process will not shutdown any context or trigger any guard condition
on either but simply raise KeyboardInterrupt and SystemExit exceptions instead.
forward_logging: whether to forward `logging` logs to the ROS 2 logging system or not.
Defaults to True for prebaked processes and to False for bare processes (though it requires
a process node to be set to function).
Expand All @@ -91,8 +97,11 @@ def __init__(
namespace = name
if forward_logging is None:
forward_logging = bool(prebaked)
self._lock = threading.Lock()
self._scope: typing.Optional[ROSAwareScope] = None
self._func = func
self._cli = cli
self._interruptible = interruptible
self._scope_kwargs = dict(
prebaked=prebaked,
autospin=autospin,
Expand All @@ -101,8 +110,6 @@ def __init__(
forward_logging=forward_logging,
**init_arguments,
)
self._scope: typing.Optional[ROSAwareScope] = None
self._lock = threading.Lock()
functools.update_wrapper(self, self._func)

@property
Expand All @@ -128,6 +135,8 @@ def __setattr__(self, name: str, value: typing.Any) -> None:
name: name of the attribute to be set
value: attribute value to be set.
"""
if threading.current_thread() is not threading.main_thread():
raise RuntimeError("process attributes can only be set from the main thread")
if not name.startswith("_") and hasattr(self._scope, name):
setattr(self._scope, name, value)
return
Expand Down Expand Up @@ -174,9 +183,19 @@ def __call__(self, argv: typing.Optional[typing.Sequence[str]] = None) -> typing
warnings.warn(f"'{namespace}' cannot be used as namespace, using scope default", stacklevel=1)
namespace = True
scope_kwargs["namespace"] = namespace
with scope.top(argv, global_=True, **scope_kwargs) as self._scope:
with scope.top(argv, global_=True, interruptible=self._interruptible, **scope_kwargs) as self._scope:
ROSAwareProcess.current = self
self._lock.release()
orig_sigterm_handler: typing.Optional[typing.Any] = None
if self._interruptible:

def handle_sigterm(*args: typing.Any) -> None:
nonlocal orig_sigterm_handler
if orig_sigterm_handler is not None and orig_sigterm_handler != signal.SIG_DFL:
orig_sigterm_handler(*args)
raise SystemExit("due to SIGTERM")

orig_sigterm_handler = signal.signal(signal.SIGTERM, handle_sigterm)
try:
sig = inspect.signature(self._func)
if not sig.parameters:
Expand All @@ -188,6 +207,8 @@ def __call__(self, argv: typing.Optional[typing.Sequence[str]] = None) -> typing
func_taking_argv = typing.cast(MainCallableTakingArgv, self._func)
return func_taking_argv(rclpy.utilities.remove_ros_args(argv))
finally:
if orig_sigterm_handler is not None:
signal.signal(signal.SIGTERM, orig_sigterm_handler)
self._lock.acquire()
ROSAwareProcess.current = None
self._scope = None
Expand All @@ -197,6 +218,41 @@ def __call__(self, argv: typing.Optional[typing.Sequence[str]] = None) -> typing
self._lock.release()
ROSAwareProcess.lock.release()

def wait_for_interrupt(self, *, timeout_sec: typing.Optional[float] = None) -> None:
"""Wait for process interruption i.e. a KeyboardInterrupt or a SystemExit.
This can only be done from the main thread. Also, note that timeouts are
implemented using POSIX timers and alarms.
Args:
timeout_sec: optional timeout for wait, wait indefinitely by default.
"""
if not self._interruptible:
raise RuntimeError("process is not interruptible")
if threading.current_thread() is not threading.main_thread():
raise RuntimeError("interrupts can only be awaited from the main thread")
if self._scope is None:
raise RuntimeError("process is not executing")
interrupted = False
orig_sigalrm_handler = None
orig_itimer_setup = None
if timeout_sec is not None:

def handle_sigalarm(*args: typing.Any) -> None:
nonlocal interrupted
interrupted = True

orig_sigalrm_handler = signal.signal(signal.SIGALRM, handle_sigalarm)
orig_itimer_setup = signal.setitimer(signal.ITIMER_REAL, timeout_sec)
try:
while not interrupted:
signal.pause()
finally:
if orig_itimer_setup is not None:
signal.setitimer(signal.ITIMER_REAL, *orig_itimer_setup)
if orig_sigalrm_handler is not None:
signal.signal(signal.SIGALRM, orig_sigalrm_handler)

def wait_for_shutdown(self, *, timeout_sec: typing.Optional[float] = None) -> bool:
"""Wait for shutdown of the underlying scope context.
Expand All @@ -206,17 +262,11 @@ def wait_for_shutdown(self, *, timeout_sec: typing.Optional[float] = None) -> bo
Returns:
True if shutdown, False on timeout.
"""
with self._lock:
if self._scope is None:
raise RuntimeError("process is not executing")
return context.wait_for_shutdown(timeout_sec=timeout_sec, context=self._scope.context)
return context.wait_for_shutdown(timeout_sec=timeout_sec, context=self.context)

def try_shutdown(self) -> None:
"""Atempts to shutdown the underlying scope context."""
with self._lock:
if self._scope is None:
raise RuntimeError("process is not executing")
rclpy.try_shutdown(context=self._scope.context)
rclpy.utilities.try_shutdown(context=self.context)


def current() -> typing.Optional[ROSAwareProcess]:
Expand Down Expand Up @@ -417,3 +467,18 @@ def wait_for_shutdown(*, timeout_sec: typing.Optional[float] = None) -> bool:
if process is None:
raise RuntimeError("no process is executing")
return process.wait_for_shutdown(timeout_sec=timeout_sec)


def wait_for_interrupt(*, timeout_sec: typing.Optional[float] = None) -> None:
"""Wait for current ROS 2 aware process interruption.
See `ROSAwareProcess.wait_for_interrupt` documentation for further reference
on positional and keyword arguments taken by this function.
Raises:
RuntimeError: if no process is executing.
"""
process = current()
if process is None:
raise RuntimeError("no process is executing")
return process.wait_for_interrupt(timeout_sec=timeout_sec)
34 changes: 28 additions & 6 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import rclpy.executors
import rclpy.logging
import rclpy.node
import rclpy.signals
import rclpy.utilities

from bdai_ros2_wrappers.executors import AutoScalingMultiThreadedExecutor, background, foreground
Expand Down Expand Up @@ -486,6 +487,7 @@ def top(
*,
context: typing.Optional[rclpy.context.Context] = None,
global_: bool = False,
interruptible: bool = False,
domain_id: typing.Optional[int] = None,
**kwargs: typing.Any,
) -> typing.Iterator[ROSAwareScope]:
Expand All @@ -495,8 +497,11 @@ def top(
args: optional command-line arguments for context initialization.
context: optional context to manage. If none is provided, one will
be created. For global scopes, the default context will be used.
global_: Whether to use the global context or a locally constructed one if one is not provided.
domain_id: A domain id used for initializing rclpy.
global_: whether to use the global context or a locally constructed one.
interruptible: global interruptible scopes will skip installing ROS 2
signal handlers and let the user deal with SIGINT and SIGTERM
interruptions instead.
domain_id: a domain id used for initializing rclpy.
kwargs: keyword arguments to pass to `ROSAwareScope`.
See `ROSAwareScope` documentation for further reference on positional
Expand All @@ -505,14 +510,31 @@ def top(
Returns:
a context manager.
"""
if context is None and not global_:
context = rclpy.context.Context()
rclpy.init(args=args, context=context, domain_id=domain_id)
if global_:
if context is not None:
raise ValueError("Cannot use a local context as global context")
default_context = rclpy.utilities.get_default_context()
default_context.init(args, domain_id=domain_id)
if not interruptible:
rclpy.signals.install_signal_handlers(
rclpy.signals.SignalHandlerOptions.ALL,
)
else:
if context is None:
context = rclpy.context.Context()
context.init(args, domain_id=domain_id)
try:
with ROSAwareScope(global_=global_, context=context, **kwargs) as scope:
yield scope
finally:
rclpy.try_shutdown(context=context)
if global_:
default_context = rclpy.utilities.get_default_context(shutting_down=True)
default_context.try_shutdown()
if not interruptible:
rclpy.signals.uninstall_signal_handlers()
else:
assert context is not None
context.try_shutdown()


def current() -> typing.Optional[ROSAwareScope]:
Expand Down
81 changes: 81 additions & 0 deletions bdai_ros2_wrappers/examples/interruptible_talker_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright (c) 2024 Boston Dynamics AI Institute Inc. All rights reserved.

"""An example of an interruptible ROS 2 single node process using process-wide machinery.
Run with:
```sh
python3 examples/interruptible_talker_example.py chit chat
```
You can speed up publication with:
```sh
python3 examples/interruptible_talker_example.py chit chat --ros-args -p rate:=5.0 # Hz
```
On Ctrl + C, the node will be interrupted. Another Ctrl + C will terminate the example
and one last "goodbye" message will be published by the talker.
"""

import itertools
import typing

import std_msgs.msg
from rclpy.executors import SingleThreadedExecutor

import bdai_ros2_wrappers.process as ros_process
from bdai_ros2_wrappers.executors import foreground
from bdai_ros2_wrappers.node import Node


class InterruptibleTalkerNode(Node):
"""A node that logs the same phrase periodically."""

def __init__(self, phrase: str, **kwargs: typing.Any) -> None:
super().__init__("interruptible_talker", **kwargs)
self.phrase = phrase
self.counter = itertools.count(start=1)
rate = self.declare_parameter("rate", 1.0).value
self.pub = self.create_publisher(std_msgs.msg.String, "chat", 1)
self.timer = self.create_timer(1 / rate, self.callback)

def callback(self) -> None:
message = f"{self.phrase} (#{next(self.counter)})"
self.pub.publish(std_msgs.msg.String(data=message))
self.get_logger().info(message)

def destroy_node(self) -> None:
message = "Goodbye!"
self.pub.publish(std_msgs.msg.String(data=message))
self.get_logger().info(message)
super().destroy_node()


@ros_process.main(prebaked=False, interruptible=True)
def main(args: typing.Sequence[str]) -> None:
"""Example entrypoint, taking command-line arguments.
It is configured as ROS 2 aware process. That is, no process-wide node,
no background autoscaling multi-threaded executor, no log forwarding to the ROS 2
logging system, and no implicit node namespacing. In other words, a run-off-the-mill
executable that uses ROS 2. Well, almost.
When executed, a single `InterruptibleTalkerNode` is instantiated and spinned in the
foreground. This will continue indefinitely until the executable is interrupted
(e.g. by a SIGINT on Ctrl + C). Another Ctrl + C will confirm the interruption.
Any other key will resume execution.
"""
with foreground(SingleThreadedExecutor()) as main.executor: # noqa: SIM117
with main.managed(InterruptibleTalkerNode, " ".join(args[1:])) as main.node:
while True:
try:
main.executor.spin()
except KeyboardInterrupt:
input(
"Press Ctrl + C again to confirm, or press any other key to continue",
)


if __name__ == "__main__":
main()
7 changes: 6 additions & 1 deletion bdai_ros2_wrappers/examples/talker_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import itertools
import typing

import std_msgs.msg

import bdai_ros2_wrappers.process as ros_process
from bdai_ros2_wrappers.node import Node

Expand All @@ -30,10 +32,13 @@ def __init__(self, phrase: str, **kwargs: typing.Any) -> None:
self.phrase = phrase
self.counter = itertools.count(start=1)
rate = self.declare_parameter("rate", 1.0).value
self.pub = self.create_publisher(std_msgs.msg.String, "chat", 1)
self.timer = self.create_timer(1 / rate, self.callback)

def callback(self) -> None:
self.get_logger().info(f"{self.phrase} (#{next(self.counter)})")
message = f"{self.phrase} (#{next(self.counter)})"
self.pub.publish(std_msgs.msg.String(data=message))
self.get_logger().info(message)


@ros_process.main(prebaked=False)
Expand Down

0 comments on commit 02ca14e

Please sign in to comment.