Skip to content

Commit

Permalink
Support service 2/2 --- rosbag2 service play (#1481)
Browse files Browse the repository at this point in the history
* Implement service play

Signed-off-by: Barry Xu <[email protected]>

* Maintain the future queue of the request by timeout

Signed-off-by: Barry Xu <[email protected]>

* Use get_message_typesupport_handle() instead of deprecated get_typesupport_handle()

Signed-off-by: Barry Xu <[email protected]>

* Resolve the conflicts caused by the rebase

Signed-off-by: Barry Xu <[email protected]>

* Address review comments from Fujita-san

Signed-off-by: Barry Xu <[email protected]>

* Move codes to new function remove_complete_request_future

Signed-off-by: Barry Xu <[email protected]>

* Move codes to new function remove_all_timeout_request_future()

Signed-off-by: Barry Xu <[email protected]>

* Add warning log and lock protection

Signed-off-by: Barry Xu <[email protected]>

* Optimize code in PlayerImpl::publish_message()

Signed-off-by: Barry Xu <[email protected]>

* Changed the code logic for determining whether to use request data of client or service

Signed-off-by: Barry Xu <[email protected]>

* Add warning log and tests

Signed-off-by: Barry Xu <[email protected]>

* Correct the parameter descriptions and remove unnecessary code

Signed-off-by: Barry Xu <[email protected]>

* Extend the parameters for the "play" command

Add new parameters '--exclude-topics' and '--exclude-services'.

Signed-off-by: Barry Xu <[email protected]>

* Move implmentation of struct client_id_hash to cpp

Signed-off-by: Barry Xu <[email protected]>

* Address some minor review comments

Signed-off-by: Barry Xu <[email protected]>

* Replace std::variant<SharedPlayerPublisher, SharedPlayerClient>>

Signed-off-by: Barry Xu <[email protected]>

* Added code for cleaning up pending requests

Signed-off-by: Barry Xu <[email protected]>

* Avoid creating publisher or client for filtered topic

Signed-off-by: Barry Xu <[email protected]>

* Update code on filtering message for mcap

Signed-off-by: Barry Xu <[email protected]>

* Use explicit namespace for topic name and update tests

Signed-off-by: Barry Xu <[email protected]>

* Update for the rebase

Signed-off-by: Barry Xu <[email protected]>

* Simplify variable names

Signed-off-by: Barry Xu <[email protected]>

* Adjust the default timeout and queue length for request future

Signed-off-by: Barry Xu <[email protected]>

* Cleanup and optimization in mcap_storage.cpp

- Rewrite topic_filter to make clean and concise implementation.
- Also addressed multiple performance related issues in topic_filter.
- Delete
MCAP_COMPILE_DEFS ROSBAG2_STORAGE_MCAP_HAS_STORAGE_FILTER_TOPIC_REGEX
compilation flag since it is become absolute on Rolling, and we are not
using anymore the same version of the mcap storage plugin for older
ROS 2 distros.

Signed-off-by: Michael Orlov <[email protected]>

* Move mcap topic_filter tests to a separate file

Signed-off-by: Michael Orlov <[email protected]>

* Add TestResetFilter and CanSelectWithTopicsListOnly to the mcap storage

Signed-off-by: Michael Orlov <[email protected]>

* Add CanSelectWithServiceEventsListOnly test to the mcap storage

Signed-off-by: Michael Orlov <[email protected]>

* Cleanups and renames in the sqlite_storage.cpp

Signed-off-by: Michael Orlov <[email protected]>

* Move SQLite3 topic_filter tests to a separate file

Signed-off-by: Michael Orlov <[email protected]>

* Bugfix in sqlite topic filters when service_events or topics lists empty

- Also add new tests `CanSelectWithTopicsListOnly` and
`CanSelectWithServiceEventsListOnly` to cover changes.

Signed-off-by: Michael Orlov <[email protected]>

* Rename test_sqlite3_topic_filter.cpp for consistency

Signed-off-by: Michael Orlov <[email protected]>

* Change API to use rcl_serialized_message_t to avoid 2 extra message copy

- Changed API for `async_send_request(..)`,
`is_include_request_message(..)` and `get_msg_event_type(..)`` to use
`rcl_serialized_message_t` instead of the `rclcpp::SerializedMessage`
to avoid 2 extra message copy.

- Renamed `introspection_include_metadata_and_contents(..)` to the
`service_event_include_metadata_and_contents(..)`.

- Altered `check_service_event_include_metadata_and_contents` unit test
to use real serialized service event for verification and comparison.

Signed-off-by: Michael Orlov <[email protected]>

* Misc minor fixes from previous rounds of review and new findings (1)

- Mostly renames, cleanup in style wording and optimization in
constructors

Signed-off-by: Michael Orlov <[email protected]>

* Address ament_flake8 warning in regards whitespaces in the blank lines

Signed-off-by: Michael Orlov <[email protected]>

* Simplify logic in is_topic_selected_by_white_list_or_regex(..)

Signed-off-by: Michael Orlov <[email protected]>
Co-authored with: Barry Xu <[email protected]>

Co-authored-by: Barry Xu <[email protected]>

* Check is_topic_in_black_list_or_exclude_regex(..) first

Co-authored-by: Barry Xu <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>

* Add test coverage for the cases when exclude lists overlap with include

Signed-off-by: Michael Orlov <[email protected]>

* Bugfix for incorrectly including all services when regex is not empty

- Also add test coverage with new tests
FilterTopicsAndServicesWithRegexAndNonexistentTopicsList and
FilterTopicsAndServicesWithRegexAndNonexistentServicesList

Signed-off-by: Michael Orlov <[email protected]>

* Rename test_sqlite3_topic_filter in CMakeList.txt

Signed-off-by: Michael Orlov <[email protected]>

* Update spin & termination and add service ready check before play

Signed-off-by: Barry Xu <[email protected]>

* Fix the issue of published_messages_from_multiple_services_are_recorded failing randomly

Increase the waiting time for the record to start spinning.

Signed-off-by: Barry Xu <[email protected]>

* Make service_event_ts_lib as private member again

- Motivation: The shared pointer to the service event type support
library shall be a member variable to make sure that library loaded
during the liveliness of the instance of this class, since we have a
raw pointers to its inner members.

Signed-off-by: Michael Orlov <[email protected]>

* Cleanup in PlayerServiceClient::async_send_request(ser_message)

- Rewrite raw pointers arithmetic. Assumption that size_t represent size
of the void* may not necessarily be true on all platforms.
- Use shared pointer with custom deleter for deserialized message.
- Assumption that we can take first element from bounded sequence by
dereferencing raw pointer to the bounded sequence may not be necessarily
be true and up to the underlying rmw and transport layer implementation.
Use dedicated request_member.get_function(request_sequence_ptr, 0)
function instead.
- Add sanity checks `for service_event_members_` in PlayerServiceClient
constructor.

Signed-off-by: Michael Orlov <[email protected]>

* Refactoring. Do full deserialization and only once

- Rationale: We can't rely on assumption that we can safely partially
deserialize service event to the ServiceEventInfo structure.

Signed-off-by: Michael Orlov <[email protected]>

* Specify service request from which introspection message and fix uncrustify errors

Signed-off-by: Barry Xu <[email protected]>

* Revert uncrustify changes from previous commit.

- Rationale: We are moving to the new version of the uncrustify in
rolling and still haven't done it yet fully for the baseline.
The discrepancy in style for other untouched files like logging.hpp
shall be addressed in a separate PR.

Signed-off-by: Michael Orlov <[email protected]>

* Rename service_request_from to the service_requests_source

- Also rename enum class ServiceRequestFrom to the ServiceRequestsSource

Signed-off-by: Michael Orlov <[email protected]>

* Add Player::wait_for_sent_service_requests_to_finish() API

- We need this API to be able to write deterministic and non-flaky tests

Signed-off-by: Michael Orlov <[email protected]>

* Mitigate potential issues related to the operations reordering on ARM

Signed-off-by: Michael Orlov <[email protected]>

* Make tests play_service_requests_from_service(client) deterministic

- Get rid of timeout inside tests and use newly added
player->wait_for_sent_service_requests_to_finish(service_name) API.

Signed-off-by: Michael Orlov <[email protected]>

* Misc findings and improvements 1

Signed-off-by: Michael Orlov <[email protected]>

* Rename get_services_clients() to the get_service_clients()

Signed-off-by: Michael Orlov <[email protected]>

* Add a new CLI parameter "--publish-service-requests" for Player

Also added a new option publish_service_requests to the PlayOptions.
Note: By default rosbag2 player will publish service events only.

Signed-off-by: Barry Xu <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>

* Fix an issue on filtering topic when prepare publishers

Co-authored-by: Barry Xu <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>

* Cleanup in play_without_publish_service_requests

- Long story short: Make it deterministic and run fast.

Signed-off-by: Michael Orlov <[email protected]>

* Wrap code which can throw with try-catch in the publish_message(..)

Signed-off-by: Michael Orlov <[email protected]>

* Delete some part of the code which became absolute and shall not be used

Signed-off-by: Michael Orlov <[email protected]>

* Update test codes

Signed-off-by: Barry Xu <[email protected]>

* Remove code on meaningless waiting for published_messages_from_multiple_services_are_recorded

Signed-off-by: Barry Xu <[email protected]>

* Update the code following the rebase

Signed-off-by: Barry Xu <[email protected]>

* Remove a unnecessary check and simplify the code

Signed-off-by: Barry Xu <[email protected]>

* Cleanup in service replay related tests

Signed-off-by: Michael Orlov <[email protected]>

* Regenerate Python stub files (.pyi) after altering python API in PR

Signed-off-by: Michael Orlov <[email protected]>

* Increase the timeout of waiting for the service to be ready

Signed-off-by: Barry Xu <[email protected]>

* Update the code for waiting on all futures of one service client

Signed-off-by: Barry Xu <[email protected]>

* Cleanup API for wait_for_sent_requests_to_finish(..)

Signed-off-by: Michael Orlov <[email protected]>

* Fixes for Windows CI build failure

- Add storage factory object as member variable to the test fixture
class. The storage factory object shall persist while returned storage
object persist.

Signed-off-by: Michael Orlov <[email protected]>

* Fix a typo

Signed-off-by: Barry Xu <[email protected]>

* Increase timeout value to stabilize a test

Signed-off-by: Barry Xu <[email protected]>

* Fix a bug in PlayerImpl::wait_for_sent_service_requests_to_finish

Signed-off-by: Barry Xu <[email protected]>

* Disable test_burst for RTI DDS due to the failure with missing requests

- The test `burst_bursting_only_filtered_services` fails only with
rmw_connext_dds for unknown reasons and tends to be flaky. Sometimes
we receive only one service request instead of 2.

Signed-off-by: Michael Orlov <[email protected]>

* Revert "Disable test_burst for RTI DDS due to the failure with missing requests"

This reverts commit aaaac74.

Signed-off-by: Michael Orlov <[email protected]>

* Disable burst_bursting_only_filtered_services for rmw_connextdds

- The test `burst_bursting_only_filtered_services` fails only with
rmw_connext_dds for unknown reasons and tends to be flaky. Sometimes
we receive only one service request instead of 2.

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Barry Xu <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>
Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
Barry-Xu-2018 and MichaelOrlov authored Apr 13, 2024
1 parent bee10b4 commit b02142a
Show file tree
Hide file tree
Showing 45 changed files with 3,110 additions and 455 deletions.
11 changes: 9 additions & 2 deletions ros2bag/ros2bag/verb/burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ros2bag.api import add_standard_reader_args
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.verb import VerbExtension
Expand All @@ -41,8 +42,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'delay of message playback.')
parser.add_argument(
'--topics', type=str, default=[], nargs='+',
help='topics to replay, separated by space. If none specified, all topics will be '
'replayed.')
help='topics to replay, separated by space. At least one topic needs to be '
"specified. If this parameter isn\'t specified, all topics will be replayed.")
parser.add_argument(
'--services', type=str, default=[], nargs='+',
help='services to replay, separated by space. At least one service needs to be '
"specified. If this parameter isn\'t specified, all services will be replayed.")
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
Expand Down Expand Up @@ -90,6 +95,8 @@ def main(self, *, args): # noqa: D102
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = 1.0
play_options.topics_to_filter = args.topics
# Convert service name to service event topic name
play_options.services_to_filter = convert_service_to_service_event_topic(args.services)
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = False
play_options.topic_remapping_options = topic_remapping
Expand Down
51 changes: 43 additions & 8 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
from ros2bag.api import add_standard_reader_args
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
from rosbag2_py import PlayOptions
from rosbag2_py import ServiceRequestsSource
from rosbag2_py import StorageOptions
import yaml

Expand All @@ -49,16 +51,23 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'-r', '--rate', type=check_positive_float, default=1.0,
help='rate at which to play back messages. Valid range > 0.0.')
parser.add_argument(
'--topics', type=str, default=[], nargs='+',
'--topics', type=str, default=[], metavar='topic', nargs='+',
help='Space-delimited list of topics to play.')
parser.add_argument(
'--services', type=str, default=[], metavar='service', nargs='+',
help='Space-delimited list of services to play.')
parser.add_argument(
'-e', '--regex', default='',
help='filter topics by regular expression to replay, separated by space. If none '
'specified, all topics will be replayed.')
help='Play only topics and services matches with regular expression.')
parser.add_argument(
'-x', '--exclude-regex', default='',
help='regular expressions to exclude topics and services from replay.')
parser.add_argument(
'--exclude-topics', type=str, default=[], metavar='topic', nargs='+',
help='Space-delimited list of topics not to play.')
parser.add_argument(
'-x', '--exclude', default='',
help='regular expressions to exclude topics from replay, separated by space. If none '
'specified, all topics will be replayed.')
'--exclude-services', type=str, default=[], metavar='service', nargs='+',
help='Space-delimited list of services not to play.')
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
Expand Down Expand Up @@ -144,6 +153,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'By default, if loaned message can be used, messages are published as loaned '
'message. It can help to reduce the number of data copies, so there is a greater '
'benefit for sending big data.')
parser.add_argument(
'--publish-service-requests', action='store_true', default=False,
help='Publish recorded service requests instead of recorded service events')
parser.add_argument(
'--service-requests-source', default='service_introspection',
choices=['service_introspection', 'client_introspection'],
help='Determine the source of the service requests to be replayed. This option only '
'makes sense if the "--publish-service-requests" option is set. By default,'
' the service requests replaying from recorded service introspection message.')

def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int:
nano_scale = 1000 * 1000 * 1000
Expand Down Expand Up @@ -182,8 +200,19 @@ def main(self, *, args): # noqa: D102
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = args.rate
play_options.topics_to_filter = args.topics
play_options.topics_regex_to_filter = args.regex
play_options.topics_regex_to_exclude = args.exclude

# Convert service name to service event topic name
play_options.services_to_filter = convert_service_to_service_event_topic(args.services)

play_options.regex_to_filter = args.regex

play_options.exclude_regex_to_filter = args.exclude_regex

play_options.exclude_topics_to_filter = args.exclude_topics if args.exclude_topics else []

play_options.exclude_service_events_to_filter = \
convert_service_to_service_event_topic(args.exclude_services)

play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = args.loop
play_options.topic_remapping_options = topic_remapping
Expand All @@ -200,6 +229,12 @@ def main(self, *, args): # noqa: D102
play_options.start_offset = args.start_offset
play_options.wait_acked_timeout = args.wait_for_all_acked
play_options.disable_loan_message = args.disable_loan_message
play_options.publish_service_requests = args.publish_service_requests
if not args.service_requests_source or \
args.service_requests_source == 'service_introspection':
play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION
else:
play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION

player = Player()
try:
Expand Down
6 changes: 5 additions & 1 deletion rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ if(BUILD_TESTING)
ament_add_gmock(test_service_utils
test/rosbag2_cpp/test_service_utils.cpp)
if(TARGET test_service_utils)
target_link_libraries(test_service_utils ${PROJECT_NAME})
target_link_libraries(test_service_utils
${PROJECT_NAME}
rosbag2_test_common::rosbag2_test_common
${test_msgs_TARGETS}
)
endif()
endif()

Expand Down
24 changes: 12 additions & 12 deletions rosbag2_cpp/include/rosbag2_cpp/logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,36 @@
RCUTILS_LOG_INFO_NAMED(ROSBAG2_CPP_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_CPP_LOG_INFO_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_INFO_NAMED(ROSBAG2_CPP_PACKAGE_NAME, "%s", __ss.str().c_str()); \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_INFO_NAMED(ROSBAG2_CPP_PACKAGE_NAME, "%s", __ss.str().c_str()); \
} while (0)

#define ROSBAG2_CPP_LOG_ERROR(...) \
RCUTILS_LOG_ERROR_NAMED(ROSBAG2_CPP_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_CPP_LOG_ERROR_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_ERROR_NAMED(ROSBAG2_CPP_PACKAGE_NAME, "%s", __ss.str().c_str()); \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_ERROR_NAMED(ROSBAG2_CPP_PACKAGE_NAME, "%s", __ss.str().c_str()); \
} while (0)

#define ROSBAG2_CPP_LOG_WARN(...) \
RCUTILS_LOG_WARN_NAMED(ROSBAG2_CPP_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_CPP_LOG_WARN_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_WARN_NAMED(ROSBAG2_CPP_PACKAGE_NAME, "%s", __ss.str().c_str()); \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_WARN_NAMED(ROSBAG2_CPP_PACKAGE_NAME, "%s", __ss.str().c_str()); \
} while (0)

#define ROSBAG2_CPP_LOG_DEBUG(...) \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_CPP_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_CPP_LOG_DEBUG_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_CPP_PACKAGE_NAME, "%s", __ss.str().c_str()); \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_CPP_PACKAGE_NAME, "%s", __ss.str().c_str()); \
} while (0)

// *INDENT-ON*
Expand Down
19 changes: 15 additions & 4 deletions rosbag2_cpp/include/rosbag2_cpp/service_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
#ifndef ROSBAG2_CPP__SERVICE_UTILS_HPP_
#define ROSBAG2_CPP__SERVICE_UTILS_HPP_

#include <array>
#include <string>

#include "rosbag2_cpp/visibility_control.hpp"

#include "service_msgs/msg/service_event_info.hpp"

namespace rosbag2_cpp
{
ROSBAG2_CPP_PUBLIC
bool
is_service_event_topic(const std::string & topic, const std::string & topic_type);
is_service_event_topic(const std::string & topic_name, const std::string & topic_type);

// Call this function after is_service_event_topic() return true
ROSBAG2_CPP_PUBLIC
Expand All @@ -36,12 +39,20 @@ std::string
service_event_topic_type_to_service_type(const std::string & topic_type);

ROSBAG2_CPP_PUBLIC
size_t
get_serialization_size_for_service_metadata_event();
std::string
service_name_to_service_event_topic_name(const std::string & service_name);

ROSBAG2_CPP_PUBLIC
std::string
service_name_to_service_event_topic_name(const std::string & service_name);
client_id_to_string(std::array<uint8_t, 16> & client_id);

struct client_id_hash
{
static_assert(
std::is_same<std::array<uint8_t, 16>,
service_msgs::msg::ServiceEventInfo::_client_gid_type>::value);
std::size_t operator()(const std::array<uint8_t, 16> & client_id) const;
};
} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__SERVICE_UTILS_HPP_
17 changes: 0 additions & 17 deletions rosbag2_cpp/src/rosbag2_cpp/info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,6 @@ rosbag2_storage::BagMetadata Info::read_metadata(

namespace
{
struct client_id_hash
{
static_assert(
std::is_same<std::array<uint8_t, 16>,
service_msgs::msg::ServiceEventInfo::_client_gid_type>::value);
std::size_t operator()(const std::array<uint8_t, 16> & client_id) const
{
std::hash<uint8_t> hasher;
std::size_t seed = 0;
for (const auto & value : client_id) {
// 0x9e3779b9 is from https://cryptography.fandom.com/wiki/Tiny_Encryption_Algorithm
seed ^= hasher(value) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
return seed;
}
};

using client_id = service_msgs::msg::ServiceEventInfo::_client_gid_type;
using sequence_set = std::unordered_set<int64_t>;
struct service_req_resp_info
Expand Down
Loading

0 comments on commit b02142a

Please sign in to comment.