Skip to content

Commit

Permalink
Add support for replaying based on publication timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard committed Dec 3, 2024
1 parent c8feaea commit 1183b7a
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 29 deletions.
15 changes: 15 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ros2bag.api import print_warn
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import MessageOrder
from rosbag2_py import Player
from rosbag2_py import PlayOptions
from rosbag2_py import ServiceRequestsSource
Expand Down Expand Up @@ -166,6 +167,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='Determine the source of the service requests to be replayed. This option only '
'makes sense if the "--publish-service-requests" option is set. By default,'
' the service requests replaying from recorded service introspection message.')
parser.add_argument(
'--message-order', default='recv',
choices=['recv', 'send'],
help='The reference to use for bag message chronological ordering. Choices: reception '
'timestamp, publication timestamp. Default: reception timestamp. '
'If messages are significantly disordered (within a single bag or across '
'multiple bags), replayed messages may not be correctly ordered. A possible '
'solution could be to increase the read_ahead_queue_size value to buffer (and '
'order) more messages.')
parser.add_argument(
'--log-level', type=str, default='info',
choices=['debug', 'info', 'warn', 'error', 'fatal'],
Expand Down Expand Up @@ -275,6 +285,11 @@ def main(self, *, args): # noqa: D102
play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION
else:
play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION
# argparse makes sure that we get a valid arg value
play_options.message_order = {
'recv': MessageOrder.RECV_TIMESTAMP,
'send': MessageOrder.SEND_TIMESTAMP,
}.get(args.message_order)

player = Player(args.log_level)
try:
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_py/rosbag2_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
Info,
)
from rosbag2_py._transport import (
MessageOrder,
Player,
PlayOptions,
ServiceRequestsSource,
Expand Down Expand Up @@ -98,6 +99,7 @@
'TopicInformation',
'BagMetadata',
'MessageDefinition',
'MessageOrder',
'MetadataIo',
'Info',
'Player',
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_py/rosbag2_py/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ from rosbag2_py._message_definitions import LocalMessageDefinitionSource as Loca
from rosbag2_py._reader import SequentialCompressionReader as SequentialCompressionReader, SequentialReader as SequentialReader, get_registered_readers as get_registered_readers
from rosbag2_py._reindexer import Reindexer as Reindexer
from rosbag2_py._storage import BagMetadata as BagMetadata, ConverterOptions as ConverterOptions, FileInformation as FileInformation, MessageDefinition as MessageDefinition, MetadataIo as MetadataIo, ReadOrder as ReadOrder, ReadOrderSortBy as ReadOrderSortBy, StorageFilter as StorageFilter, StorageOptions as StorageOptions, TopicInformation as TopicInformation, TopicMetadata as TopicMetadata, convert_rclcpp_qos_to_rclpy_qos as convert_rclcpp_qos_to_rclpy_qos, get_default_storage_id as get_default_storage_id, to_rclcpp_qos_vector as to_rclcpp_qos_vector
from rosbag2_py._transport import PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite
from rosbag2_py._transport import MessageOrder as MessageOrder, PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite
from rosbag2_py._writer import SequentialCompressionWriter as SequentialCompressionWriter, SequentialWriter as SequentialWriter, get_registered_compressors as get_registered_compressors, get_registered_serializers as get_registered_serializers, get_registered_writers as get_registered_writers

__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource']
__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MessageOrder', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource']
16 changes: 16 additions & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,20 @@ class ServiceRequestsSource:
@property
def value(self) -> int: ...

class MessageOrder:
__members__: ClassVar[dict] = ... # read-only
RECV_TIMESTAMP: ClassVar[MessageOrder] = ...
SEND_TIMESTAMP: ClassVar[MessageOrder] = ...
__entries: ClassVar[dict] = ...
def __init__(self, value: int) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
@property
def name(self) -> str: ...
@property
def value(self) -> int: ...

def bag_rewrite(arg0: List[rosbag2_py._storage.StorageOptions], arg1: str) -> None: ...
6 changes: 6 additions & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,19 @@ PYBIND11_MODULE(_transport, m) {
.def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message)
.def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests)
.def_readwrite("service_requests_source", &PlayOptions::service_requests_source)
.def_readwrite("message_order", &PlayOptions::message_order)
;

py::enum_<rosbag2_transport::ServiceRequestsSource>(m, "ServiceRequestsSource")
.value("SERVICE_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::SERVICE_INTROSPECTION)
.value("CLIENT_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION)
;

py::enum_<rosbag2_transport::MessageOrder>(m, "MessageOrder")
.value("RECV_TIMESTAMP", rosbag2_transport::MessageOrder::RECV_TIMESTAMP)
.value("SEND_TIMESTAMP", rosbag2_transport::MessageOrder::SEND_TIMESTAMP)
;

py::class_<RecordOptions>(m, "RecordOptions")
.def(py::init<>())
.def_readwrite("all_topics", &RecordOptions::all_topics)
Expand Down
14 changes: 14 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ enum class ServiceRequestsSource : int8_t
CLIENT_INTROSPECTION = 1
};

enum class MessageOrder : std::uint8_t
{
// Order chronologically by message reception timestamp
RECV_TIMESTAMP = 0,
// Order chronologically by message publication timestamp
SEND_TIMESTAMP = 1
};

struct PlayOptions
{
public:
Expand Down Expand Up @@ -123,6 +131,12 @@ struct PlayOptions

// The source of the service request
ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION;

// The reference to use for bag message chronological ordering.
// If messages are significantly disordered (within a single bag or across multiple bags),
// replayed messages may not be correctly ordered. A possible solution could be to increase the
// read_ahead_queue_size value to buffer (and order) more messages.
MessageOrder message_order = MessageOrder::RECV_TIMESTAMP;
};

} // namespace rosbag2_transport
Expand Down
38 changes: 35 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ class PlayerImpl
rosbag2_transport::PlayOptions play_options_;
rcutils_time_point_value_t play_until_timestamp_ = -1;
using BagMessageComparator = std::function<
int(
bool(
const rosbag2_storage::SerializedBagMessageSharedPtr &,
const rosbag2_storage::SerializedBagMessageSharedPtr &)>;
LockedPriorityQueue<
Expand Down Expand Up @@ -372,6 +372,8 @@ class PlayerImpl

std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;

static BagMessageComparator get_bag_message_comparator(const MessageOrder & order);

/// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp.
struct
{
Expand All @@ -381,9 +383,39 @@ class PlayerImpl
{
return l->recv_timestamp > r->recv_timestamp;
}
} bag_message_chronological_recv_timestamp_comparator;
} static bag_message_chronological_recv_timestamp_comparator;

/// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp.
struct
{
bool operator()(
const rosbag2_storage::SerializedBagMessageSharedPtr & l,
const rosbag2_storage::SerializedBagMessageSharedPtr & r) const
{
return l->send_timestamp > r->send_timestamp;
}
} static bag_message_chronological_send_timestamp_comparator;
};

decltype(PlayerImpl::bag_message_chronological_recv_timestamp_comparator)
PlayerImpl::bag_message_chronological_recv_timestamp_comparator;
decltype(PlayerImpl::bag_message_chronological_send_timestamp_comparator)
PlayerImpl::bag_message_chronological_send_timestamp_comparator;

PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order)
{
switch (order) {
case MessageOrder::RECV_TIMESTAMP:
return bag_message_chronological_recv_timestamp_comparator;
case MessageOrder::SEND_TIMESTAMP:
return bag_message_chronological_send_timestamp_comparator;
default:
throw std::runtime_error(
"unknown MessageOrder: " +
std::to_string(static_cast<std::underlying_type_t<MessageOrder>>(order)));
}
}

PlayerImpl::PlayerImpl(
Player * owner,
std::vector<reader_storage_options_pair_t> && readers_with_options,
Expand All @@ -392,7 +424,7 @@ PlayerImpl::PlayerImpl(
: readers_with_options_(std::move(readers_with_options)),
owner_(owner),
play_options_(play_options),
message_queue_(bag_message_chronological_recv_timestamp_comparator),
message_queue_(get_bag_message_comparator(play_options_.message_order)),
keyboard_handler_(std::move(keyboard_handler)),
player_service_client_manager_(std::make_shared<PlayerServiceClientManager>())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,22 @@ class Rosbag2TransportTestFixture : public Test
const std::string & topic,
int64_t milliseconds,
std::shared_ptr<MessageT> message)
{
return serialize_test_message(topic, milliseconds, 0, message);
}

template<typename MessageT>
std::shared_ptr<rosbag2_storage::SerializedBagMessage>
serialize_test_message(
const std::string & topic,
int64_t milliseconds_recv,
int64_t milliseconds_send,
std::shared_ptr<MessageT> message)
{
auto bag_msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_msg->serialized_data = memory_management_.serialize_message(message);
bag_msg->recv_timestamp = milliseconds * 1000000;
bag_msg->recv_timestamp = milliseconds_recv * 1000000;
bag_msg->send_timestamp = milliseconds_send * 1000000;
bag_msg->topic_name = topic;

return bag_msg;
Expand Down
84 changes: 61 additions & 23 deletions rosbag2_transport/test/rosbag2_transport/test_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ void spin_thread_and_wait_for_sent_service_requests_to_finish(
}
} // namespace

class RosBag2PlayTestFixtureMessageOrder
: public RosBag2PlayTestFixture, public WithParamInterface<rosbag2_transport::MessageOrder>
{};

TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics)
{
auto primitive_message1 = get_messages_basic_types()[0];
Expand Down Expand Up @@ -174,7 +178,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics)
ElementsAre(40.0f, 2.0f, 0.0f)))));
}

TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_three_bags)
TEST_P(
RosBag2PlayTestFixtureMessageOrder, recorded_messages_are_played_for_all_topics_from_three_bags)
{
auto msg = get_messages_basic_types()[0];
msg->int32_value = 42;
Expand All @@ -184,30 +189,30 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_
{2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""},
};

// Make sure each reader's/bag's messages are ordered by time
// However, do interlace messages across bags
// Make sure each reader's/bag's messages are ordered by recv_timestamp
// However, do interlace messages based on send_timestamp within a bag and across bags
std::vector<std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>> messages_list{};
messages_list.emplace_back(std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>{
serialize_test_message("topic1", 1, msg),
serialize_test_message("topic2", 5, msg),
serialize_test_message("topic1", 8, msg),
serialize_test_message("topic2", 10, msg),
serialize_test_message("topic1", 13, msg),
serialize_test_message("topic2", 14, msg)});
serialize_test_message("topic1", 1, 1, msg),
serialize_test_message("topic2", 5, 2, msg),
serialize_test_message("topic1", 8, 4, msg),
serialize_test_message("topic2", 10, 8, msg),
serialize_test_message("topic1", 13, 7, msg),
serialize_test_message("topic2", 14, 15, msg)});
messages_list.emplace_back(std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>{
serialize_test_message("topic1", 2, msg),
serialize_test_message("topic2", 3, msg),
serialize_test_message("topic1", 6, msg),
serialize_test_message("topic2", 10, msg),
serialize_test_message("topic1", 12, msg),
serialize_test_message("topic2", 16, msg)});
serialize_test_message("topic1", 2, 1, msg),
serialize_test_message("topic2", 3, 2, msg),
serialize_test_message("topic1", 6, 5, msg),
serialize_test_message("topic2", 10, 8, msg),
serialize_test_message("topic1", 12, 7, msg),
serialize_test_message("topic2", 16, 14, msg)});
messages_list.emplace_back(std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>{
serialize_test_message("topic1", 1, msg),
serialize_test_message("topic2", 4, msg),
serialize_test_message("topic1", 7, msg),
serialize_test_message("topic2", 9, msg),
serialize_test_message("topic1", 11, msg),
serialize_test_message("topic2", 15, msg)});
serialize_test_message("topic1", 1, 1, msg),
serialize_test_message("topic2", 4, 3, msg),
serialize_test_message("topic1", 7, 2, msg),
serialize_test_message("topic2", 9, 9, msg),
serialize_test_message("topic1", 11, 8, msg),
serialize_test_message("topic2", 15, 7, msg)});
std::vector<rosbag2_transport::Player::reader_storage_options_pair_t> bags{};
std::size_t total_messages = 0u;
for (std::size_t i = 0u; i < messages_list.size(); i++) {
Expand All @@ -219,13 +224,27 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_
}
ASSERT_GT(total_messages, 0u);

const rosbag2_transport::MessageOrder message_order = GetParam();
play_options_.message_order = message_order;
auto player = std::make_shared<rosbag2_transport::Player>(std::move(bags), play_options_);
std::size_t num_played_messages = 0u;
rcutils_time_point_value_t last_timetamp = 0;
const auto get_timestamp =
[message_order](std::shared_ptr<rosbag2_storage::SerializedBagMessage> msg) {
switch (message_order) {
case rosbag2_transport::MessageOrder::RECV_TIMESTAMP:
return msg->recv_timestamp;
case rosbag2_transport::MessageOrder::SEND_TIMESTAMP:
return msg->send_timestamp;
default:
throw std::runtime_error("unknown rosbag2_transport::MessageOrder value");
}
};
const auto callback = [&](std::shared_ptr<rosbag2_storage::SerializedBagMessage> msg) {
// Make sure messages are played in order
EXPECT_LE(last_timetamp, msg->recv_timestamp);
last_timetamp = msg->recv_timestamp;
const auto timestamp = get_timestamp(msg);
EXPECT_LE(last_timetamp, timestamp);
last_timetamp = timestamp;
num_played_messages++;
};
player->add_on_play_message_pre_callback(callback);
Expand All @@ -234,6 +253,25 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_
EXPECT_EQ(total_messages, num_played_messages);
}

INSTANTIATE_TEST_SUITE_P(
ParametrizedPlayTests,
RosBag2PlayTestFixtureMessageOrder,
Values(
rosbag2_transport::MessageOrder::RECV_TIMESTAMP,
rosbag2_transport::MessageOrder::SEND_TIMESTAMP
),
[](const TestParamInfo<RosBag2PlayTestFixtureMessageOrder::ParamType> & info) {
switch (info.param) {
case MessageOrder::RECV_TIMESTAMP:
return "recv_timestamp";
case MessageOrder::SEND_TIMESTAMP:
return "send_timestamp";
default:
throw std::runtime_error("unknown value");
}
}
);

TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services)
{
const std::string service_name1 = "/test_service1";
Expand Down

0 comments on commit 1183b7a

Please sign in to comment.