Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make rosbag2 nodes locally unique by suffixing with PID #1432

Open
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ros2bag/ros2bag/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ def add_standard_reader_args(parser: ArgumentParser) -> None:
'By default attempts to detect automatically - use this argument to override.')


def add_standard_node_args(parser: ArgumentParser, prefix: str) -> None:
"""Add arguments for for verbs that create a Node."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Add arguments for for verbs that create a Node."""
"""Add arguments for verbs that create a Node."""

Nitpick, need to remove extra "for".

node_name = f'{prefix}_{os.getpid()}'
parser.add_argument(
'--node-name', type=str, default=node_name,
help=f'Specify the recorder node name. Default is {prefix}_PID.')


def _parse_cli_storage_plugin():
plugin_choices = set(rosbag2_py.get_registered_writers())
default_storage = rosbag2_py.get_default_storage_id()
Expand Down
17 changes: 10 additions & 7 deletions ros2bag/ros2bag/verb/burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
from argparse import FileType

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import add_standard_reader_args
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.api import (
add_standard_node_args,
add_standard_reader_args,
check_not_negative_int,
check_positive_float,
convert_yaml_to_qos_profile,
print_error,
)
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
Expand All @@ -33,7 +36,7 @@ class BurstVerb(VerbExtension):

def add_arguments(self, parser, cli_name): # noqa: D102
add_standard_reader_args(parser)

add_standard_node_args(parser, 'rosbag2_player')
parser.add_argument(
'--read-ahead-queue-size', type=int, default=1000,
help='size of message queue rosbag tries to hold in memory to help deterministic '
Expand Down Expand Up @@ -100,5 +103,5 @@ def main(self, *, args): # noqa: D102
play_options.start_offset = args.start_offset
play_options.wait_acked_timeout = -1

player = Player()
player = Player(args.node_name)
player.burst(storage_options, play_options, args.num_messages)
16 changes: 10 additions & 6 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
from argparse import FileType

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import add_standard_reader_args
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.api import (
add_standard_node_args,
add_standard_reader_args,
check_not_negative_int,
check_positive_float,
convert_yaml_to_qos_profile,
print_error,
)
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
Expand All @@ -40,6 +43,7 @@ class PlayVerb(VerbExtension):

def add_arguments(self, parser, cli_name): # noqa: D102
add_standard_reader_args(parser)
add_standard_node_args(parser, 'rosbag2_player')
parser.add_argument(
'--read-ahead-queue-size', type=int, default=1000,
help='size of message queue rosbag tries to hold in memory to help deterministic '
Expand Down Expand Up @@ -202,7 +206,7 @@ def main(self, *, args): # noqa: D102
play_options.wait_acked_timeout = args.wait_for_all_acked
play_options.disable_loan_message = args.disable_loan_message

player = Player()
player = Player(args.node_name)
try:
player.play(storage_options, play_options)
except KeyboardInterrupt:
Expand Down
16 changes: 9 additions & 7 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import os

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import add_writer_storage_plugin_extensions
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.api import SplitLineFormatter
from ros2bag.api import (
add_standard_node_args,
add_writer_storage_plugin_extensions,
convert_yaml_to_qos_profile,
print_error,
SplitLineFormatter,
)
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import get_default_storage_id
Expand Down Expand Up @@ -101,6 +104,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')

# Core config
add_standard_node_args(parser, prefix='rosbag2_recorder')
parser.add_argument(
'-f', '--serialization-format', default='', choices=serialization_choices,
help='The rmw serialization format in which the messages are saved, defaults to the '
Expand Down Expand Up @@ -131,9 +135,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'--use-sim-time', action='store_true', default=False,
help='Use simulation time for message timestamps by subscribing to the /clock topic. '
'Until first /clock message is received, no messages will be written to bag.')
parser.add_argument(
'--node-name', type=str, default='rosbag2_recorder',
help='Specify the recorder node name. Default is %(default)s.')

parser.add_argument(
'--custom-data', type=str, metavar='KEY=VALUE', nargs='*',
help='Store the custom data in metadata.yaml '
Expand Down
12 changes: 8 additions & 4 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ namespace rosbag2_py
class Player
{
public:
Player()
explicit Player(const std::string & node_name)
{
node_name_ = node_name;
rclcpp::init(0, nullptr);
}

Expand All @@ -146,7 +147,7 @@ class Player
{
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);
std::move(reader), storage_options, play_options, node_name_);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
Expand All @@ -167,7 +168,7 @@ class Player
{
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);
std::move(reader), storage_options, play_options, node_name_);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
Expand All @@ -185,6 +186,9 @@ class Player
spin_thread.join();
play_thread.join();
}

private:
std::string node_name_;
};

class Recorder
Expand Down Expand Up @@ -377,7 +381,7 @@ PYBIND11_MODULE(_transport, m) {
;

py::class_<rosbag2_py::Player>(m, "Player")
.def(py::init())
.def(py::init<std::string>(), pybind11::arg("node_name"))
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
.def("burst", &rosbag2_py::Player::burst)
;
Expand Down