diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 2c8bf723a..3df54bdb7 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -26,6 +26,7 @@ from ros2bag.api import print_warn from ros2bag.verb import VerbExtension from ros2cli.node import NODE_NAME_PREFIX +from rosbag2_py import MessageOrder from rosbag2_py import Player from rosbag2_py import PlayOptions from rosbag2_py import ServiceRequestsSource @@ -166,6 +167,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102 help='Determine the source of the service requests to be replayed. This option only ' 'makes sense if the "--publish-service-requests" option is set. By default,' ' the service requests replaying from recorded service introspection message.') + parser.add_argument( + '--message-order', default='recv', + choices=['recv', 'send'], + help='The reference to use for bag message chronological ordering. Choices: reception ' + 'timestamp, publication timestamp. Default: reception timestamp. ' + 'If messages are significantly disordered (within a single bag or across ' + 'multiple bags), replayed messages may not be correctly ordered. A possible ' + 'solution could be to increase the read_ahead_queue_size value to buffer (and ' + 'order) more messages.') parser.add_argument( '--log-level', type=str, default='info', choices=['debug', 'info', 'warn', 'error', 'fatal'], @@ -275,6 +285,11 @@ def main(self, *, args): # noqa: D102 play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION else: play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION + # argparse makes sure that we get a valid arg value + play_options.message_order = { + 'recv': MessageOrder.RECV_TIMESTAMP, + 'send': MessageOrder.SEND_TIMESTAMP, + }.get(args.message_order) player = Player(args.log_level) try: diff --git a/rosbag2_py/rosbag2_py/__init__.py b/rosbag2_py/rosbag2_py/__init__.py index f68250997..46354dbc7 100644 --- a/rosbag2_py/rosbag2_py/__init__.py +++ b/rosbag2_py/rosbag2_py/__init__.py @@ -59,6 +59,7 @@ Info, ) from rosbag2_py._transport import ( + MessageOrder, Player, PlayOptions, ServiceRequestsSource, @@ -98,6 +99,7 @@ 'TopicInformation', 'BagMetadata', 'MessageDefinition', + 'MessageOrder', 'MetadataIo', 'Info', 'Player', diff --git a/rosbag2_py/rosbag2_py/__init__.pyi b/rosbag2_py/rosbag2_py/__init__.pyi index c38eaa338..7fdbfe63d 100644 --- a/rosbag2_py/rosbag2_py/__init__.pyi +++ b/rosbag2_py/rosbag2_py/__init__.pyi @@ -4,7 +4,7 @@ from rosbag2_py._message_definitions import LocalMessageDefinitionSource as Loca from rosbag2_py._reader import SequentialCompressionReader as SequentialCompressionReader, SequentialReader as SequentialReader, get_registered_readers as get_registered_readers from rosbag2_py._reindexer import Reindexer as Reindexer from rosbag2_py._storage import BagMetadata as BagMetadata, ConverterOptions as ConverterOptions, FileInformation as FileInformation, MessageDefinition as MessageDefinition, MetadataIo as MetadataIo, ReadOrder as ReadOrder, ReadOrderSortBy as ReadOrderSortBy, StorageFilter as StorageFilter, StorageOptions as StorageOptions, TopicInformation as TopicInformation, TopicMetadata as TopicMetadata, convert_rclcpp_qos_to_rclpy_qos as convert_rclcpp_qos_to_rclpy_qos, get_default_storage_id as get_default_storage_id, to_rclcpp_qos_vector as to_rclcpp_qos_vector -from rosbag2_py._transport import PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite +from rosbag2_py._transport import MessageOrder as MessageOrder, PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite from rosbag2_py._writer import SequentialCompressionWriter as SequentialCompressionWriter, SequentialWriter as SequentialWriter, get_registered_compressors as get_registered_compressors, get_registered_serializers as get_registered_serializers, get_registered_writers as get_registered_writers -__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource'] +__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MessageOrder', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource'] diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index c2512f483..7864292e4 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -98,4 +98,20 @@ class ServiceRequestsSource: @property def value(self) -> int: ... +class MessageOrder: + __members__: ClassVar[dict] = ... # read-only + RECV_TIMESTAMP: ClassVar[MessageOrder] = ... + SEND_TIMESTAMP: ClassVar[MessageOrder] = ... + __entries: ClassVar[dict] = ... + def __init__(self, value: int) -> None: ... + def __eq__(self, other: object) -> bool: ... + def __hash__(self) -> int: ... + def __index__(self) -> int: ... + def __int__(self) -> int: ... + def __ne__(self, other: object) -> bool: ... + @property + def name(self) -> str: ... + @property + def value(self) -> int: ... + def bag_rewrite(arg0: List[rosbag2_py._storage.StorageOptions], arg1: str) -> None: ... diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index afe813b2f..09023f872 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -560,6 +560,7 @@ PYBIND11_MODULE(_transport, m) { .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) .def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests) .def_readwrite("service_requests_source", &PlayOptions::service_requests_source) + .def_readwrite("message_order", &PlayOptions::message_order) ; py::enum_(m, "ServiceRequestsSource") @@ -567,6 +568,11 @@ PYBIND11_MODULE(_transport, m) { .value("CLIENT_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION) ; + py::enum_(m, "MessageOrder") + .value("RECV_TIMESTAMP", rosbag2_transport::MessageOrder::RECV_TIMESTAMP) + .value("SEND_TIMESTAMP", rosbag2_transport::MessageOrder::SEND_TIMESTAMP) + ; + py::class_(m, "RecordOptions") .def(py::init<>()) .def_readwrite("all_topics", &RecordOptions::all_topics) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index e5029c923..7489c9825 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -34,6 +34,14 @@ enum class ServiceRequestsSource : int8_t CLIENT_INTROSPECTION = 1 }; +enum class MessageOrder : std::uint8_t +{ + // Order chronologically by message reception timestamp + RECV_TIMESTAMP = 0, + // Order chronologically by message publication timestamp + SEND_TIMESTAMP = 1 +}; + struct PlayOptions { public: @@ -123,6 +131,12 @@ struct PlayOptions // The source of the service request ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; + + // The reference to use for bag message chronological ordering. + // If messages are significantly disordered (within a single bag or across multiple bags), + // replayed messages may not be correctly ordered. A possible solution could be to increase the + // read_ahead_queue_size value to buffer (and order) more messages. + MessageOrder message_order = MessageOrder::RECV_TIMESTAMP; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d3b322ee5..4b60f0f0e 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -320,7 +320,7 @@ class PlayerImpl rosbag2_transport::PlayOptions play_options_; rcutils_time_point_value_t play_until_timestamp_ = -1; using BagMessageComparator = std::function< - int( + bool( const rosbag2_storage::SerializedBagMessageSharedPtr &, const rosbag2_storage::SerializedBagMessageSharedPtr &)>; LockedPriorityQueue< @@ -372,6 +372,8 @@ class PlayerImpl std::shared_ptr player_service_client_manager_; + static BagMessageComparator get_bag_message_comparator(const MessageOrder & order); + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp. struct { @@ -381,9 +383,39 @@ class PlayerImpl { return l->recv_timestamp > r->recv_timestamp; } - } bag_message_chronological_recv_timestamp_comparator; + } static bag_message_chronological_recv_timestamp_comparator; + + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp. + struct + { + bool operator()( + const rosbag2_storage::SerializedBagMessageSharedPtr & l, + const rosbag2_storage::SerializedBagMessageSharedPtr & r) const + { + return l->send_timestamp > r->send_timestamp; + } + } static bag_message_chronological_send_timestamp_comparator; }; +decltype(PlayerImpl::bag_message_chronological_recv_timestamp_comparator) +PlayerImpl::bag_message_chronological_recv_timestamp_comparator; +decltype(PlayerImpl::bag_message_chronological_send_timestamp_comparator) +PlayerImpl::bag_message_chronological_send_timestamp_comparator; + +PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order) +{ + switch (order) { + case MessageOrder::RECV_TIMESTAMP: + return bag_message_chronological_recv_timestamp_comparator; + case MessageOrder::SEND_TIMESTAMP: + return bag_message_chronological_send_timestamp_comparator; + default: + throw std::runtime_error( + "unknown MessageOrder: " + + std::to_string(static_cast>(order))); + } +} + PlayerImpl::PlayerImpl( Player * owner, std::vector && readers_with_options, @@ -392,7 +424,7 @@ PlayerImpl::PlayerImpl( : readers_with_options_(std::move(readers_with_options)), owner_(owner), play_options_(play_options), - message_queue_(bag_message_chronological_recv_timestamp_comparator), + message_queue_(get_bag_message_comparator(play_options_.message_order)), keyboard_handler_(std::move(keyboard_handler)), player_service_client_manager_(std::make_shared()) { diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 594c1a92a..0de0100f7 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -57,10 +57,22 @@ class Rosbag2TransportTestFixture : public Test const std::string & topic, int64_t milliseconds, std::shared_ptr message) + { + return serialize_test_message(topic, milliseconds, 0, message); + } + + template + std::shared_ptr + serialize_test_message( + const std::string & topic, + int64_t milliseconds_recv, + int64_t milliseconds_send, + std::shared_ptr message) { auto bag_msg = std::make_shared(); bag_msg->serialized_data = memory_management_.serialize_message(message); - bag_msg->recv_timestamp = milliseconds * 1000000; + bag_msg->recv_timestamp = milliseconds_recv * 1000000; + bag_msg->send_timestamp = milliseconds_send * 1000000; bag_msg->topic_name = topic; return bag_msg; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 64952fe39..f976310af 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -109,6 +109,10 @@ void spin_thread_and_wait_for_sent_service_requests_to_finish( } } // namespace +class RosBag2PlayTestFixtureMessageOrder + : public RosBag2PlayTestFixture, public WithParamInterface +{}; + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) { auto primitive_message1 = get_messages_basic_types()[0]; @@ -174,7 +178,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } -TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_three_bags) +TEST_P( + RosBag2PlayTestFixtureMessageOrder, recorded_messages_are_played_for_all_topics_from_three_bags) { auto msg = get_messages_basic_types()[0]; msg->int32_value = 42; @@ -184,30 +189,30 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ {2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""}, }; - // Make sure each reader's/bag's messages are ordered by time - // However, do interlace messages across bags + // Make sure each reader's/bag's messages are ordered by recv_timestamp + // However, do interlace messages based on send_timestamp within a bag and across bags std::vector>> messages_list{}; messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 1, msg), - serialize_test_message("topic2", 5, msg), - serialize_test_message("topic1", 8, msg), - serialize_test_message("topic2", 10, msg), - serialize_test_message("topic1", 13, msg), - serialize_test_message("topic2", 14, msg)}); + serialize_test_message("topic1", 1, 1, msg), + serialize_test_message("topic2", 5, 2, msg), + serialize_test_message("topic1", 8, 4, msg), + serialize_test_message("topic2", 10, 8, msg), + serialize_test_message("topic1", 13, 7, msg), + serialize_test_message("topic2", 14, 15, msg)}); messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 2, msg), - serialize_test_message("topic2", 3, msg), - serialize_test_message("topic1", 6, msg), - serialize_test_message("topic2", 10, msg), - serialize_test_message("topic1", 12, msg), - serialize_test_message("topic2", 16, msg)}); + serialize_test_message("topic1", 2, 1, msg), + serialize_test_message("topic2", 3, 2, msg), + serialize_test_message("topic1", 6, 5, msg), + serialize_test_message("topic2", 10, 8, msg), + serialize_test_message("topic1", 12, 7, msg), + serialize_test_message("topic2", 16, 14, msg)}); messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 1, msg), - serialize_test_message("topic2", 4, msg), - serialize_test_message("topic1", 7, msg), - serialize_test_message("topic2", 9, msg), - serialize_test_message("topic1", 11, msg), - serialize_test_message("topic2", 15, msg)}); + serialize_test_message("topic1", 1, 1, msg), + serialize_test_message("topic2", 4, 3, msg), + serialize_test_message("topic1", 7, 2, msg), + serialize_test_message("topic2", 9, 9, msg), + serialize_test_message("topic1", 11, 8, msg), + serialize_test_message("topic2", 15, 7, msg)}); std::vector bags{}; std::size_t total_messages = 0u; for (std::size_t i = 0u; i < messages_list.size(); i++) { @@ -219,13 +224,27 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ } ASSERT_GT(total_messages, 0u); + const rosbag2_transport::MessageOrder message_order = GetParam(); + play_options_.message_order = message_order; auto player = std::make_shared(std::move(bags), play_options_); std::size_t num_played_messages = 0u; rcutils_time_point_value_t last_timetamp = 0; + const auto get_timestamp = + [message_order](std::shared_ptr msg) { + switch (message_order) { + case rosbag2_transport::MessageOrder::RECV_TIMESTAMP: + return msg->recv_timestamp; + case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: + return msg->send_timestamp; + default: + throw std::runtime_error("unknown rosbag2_transport::MessageOrder value"); + } + }; const auto callback = [&](std::shared_ptr msg) { // Make sure messages are played in order - EXPECT_LE(last_timetamp, msg->recv_timestamp); - last_timetamp = msg->recv_timestamp; + const auto timestamp = get_timestamp(msg); + EXPECT_LE(last_timetamp, timestamp); + last_timetamp = timestamp; num_played_messages++; }; player->add_on_play_message_pre_callback(callback); @@ -234,6 +253,25 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ EXPECT_EQ(total_messages, num_played_messages); } +INSTANTIATE_TEST_SUITE_P( + ParametrizedPlayTests, + RosBag2PlayTestFixtureMessageOrder, + Values( + rosbag2_transport::MessageOrder::RECV_TIMESTAMP, + rosbag2_transport::MessageOrder::SEND_TIMESTAMP + ), + [](const TestParamInfo & info) { + switch (info.param) { + case MessageOrder::RECV_TIMESTAMP: + return "recv_timestamp"; + case MessageOrder::SEND_TIMESTAMP: + return "send_timestamp"; + default: + throw std::runtime_error("unknown value"); + } + } +); + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) { const std::string service_name1 = "/test_service1";