diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 185fa804..732cd470 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) +find_package(tracetools REQUIRED) find_package(zenoh_cpp_vendor REQUIRED) add_library(rmw_zenoh_cpp SHARED @@ -67,6 +68,7 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw + tracetools::tracetools zenohcxx::zenohc ) diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index 79272665..9376b716 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -25,6 +25,7 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw + tracetools ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index f85dab86..736e1dbe 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -44,6 +44,8 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "tracetools/tracetools.h" + namespace rmw_zenoh_cpp { ///============================================================================= @@ -363,10 +365,17 @@ rmw_ret_t ClientData::send_request( size_t data_length = ser.get_serialized_data_length(); *sequence_id = sequence_number_++; + TRACETOOLS_TRACEPOINT( + rmw_send_request, + static_cast(rmw_client_), + static_cast(ros_request), + *sequence_id); + // Send request zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); - std::array local_gid = entity_->copy_gid(); - opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid); + int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); + opts.attachment = rmw_zenoh_cpp::AttachmentData( + *sequence_id, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // The default timeout for a z_get query is 10 seconds and if a response is not received within // this window, the queryable will return an invalid reply. However, it is common for actions, diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index 44b187ba..f3e665ab 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -118,6 +118,7 @@ class ClientData final : public std::enable_shared_from_this mutable std::recursive_mutex mutex_; // The parent node. const rmw_node_t * rmw_node_; + // The rmw client. const rmw_client_t * rmw_client_; // The Entity generated for the service. std::shared_ptr entity_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index ed1b5b2b..23ca5e78 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -141,6 +141,7 @@ bool NodeData::create_pub_data( auto pub_data = PublisherData::make( session, + publisher, node_, entity_->node_info(), id_, @@ -276,6 +277,7 @@ bool NodeData::create_service_data( auto service_data = ServiceData::make( session, node_, + service, entity_->node_info(), id_, std::move(id), diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 3bf62372..f960b879 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -37,6 +37,8 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "tracetools/tracetools.h" + namespace rmw_zenoh_cpp { // TODO(yuyuan): SHM, make this configurable @@ -45,6 +47,7 @@ namespace rmw_zenoh_cpp ///============================================================================= std::shared_ptr PublisherData::make( std::shared_ptr session, + const rmw_publisher_t * const rmw_publisher, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -158,6 +161,7 @@ std::shared_ptr PublisherData::make( return std::shared_ptr( new PublisherData{ + rmw_publisher, node, std::move(entity), std::move(session), @@ -171,6 +175,7 @@ std::shared_ptr PublisherData::make( ///============================================================================= PublisherData::PublisherData( + const rmw_publisher_t * const rmw_publisher, const rmw_node_t * rmw_node, std::shared_ptr entity, std::shared_ptr sess, @@ -179,7 +184,8 @@ PublisherData::PublisherData( zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support) -: rmw_node_(rmw_node), +: rmw_publisher_(rmw_publisher), + rmw_node_(rmw_node), entity_(std::move(entity)), sess_(std::move(sess)), pub_(std::move(pub)), @@ -246,10 +252,10 @@ rmw_ret_t PublisherData::publish( // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. zenoh::ZResult result; + int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); auto options = zenoh::Publisher::PutOptions::create_default(); - options.attachment = create_map_and_set_sequence_num( - sequence_number_++, - entity_->copy_gid()); + options.attachment = rmw_zenoh_cpp::AttachmentData( + sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); // TODO(ahcorde): shmbuf std::vector raw_data( @@ -257,6 +263,8 @@ rmw_ret_t PublisherData::publish( reinterpret_cast(msg_bytes) + data_length); zenoh::Bytes payload(std::move(raw_data)); + TRACETOOLS_TRACEPOINT( + rmw_publish, static_cast(rmw_publisher_), ros_message, source_timestamp); pub_.put(std::move(payload), std::move(options), &result); if (result != Z_OK) { if (result == Z_ESESSION_CLOSED) { @@ -292,14 +300,18 @@ rmw_ret_t PublisherData::publish_serialized_message( // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. zenoh::ZResult result; + int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); auto options = zenoh::Publisher::PutOptions::create_default(); - options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid()); + options.attachment = rmw_zenoh_cpp::AttachmentData( + sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); std::vector raw_data( serialized_message->buffer, serialized_message->buffer + data_length); zenoh::Bytes payload(std::move(raw_data)); + TRACETOOLS_TRACEPOINT( + rmw_publish, static_cast(rmw_publisher_), serialized_message, source_timestamp); pub_.put(std::move(payload), std::move(options), &result); if (result != Z_OK) { if (result == Z_ESESSION_CLOSED) { diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 52c22523..7cec5406 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -45,6 +45,7 @@ class PublisherData final // Make a shared_ptr of PublisherData. static std::shared_ptr make( std::shared_ptr session, + const rmw_publisher_t * const rmw_publisher, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -90,6 +91,7 @@ class PublisherData final private: // Constructor. PublisherData( + const rmw_publisher_t * const rmw_publisher, const rmw_node_t * rmw_node, std::shared_ptr entity, std::shared_ptr session, @@ -101,6 +103,8 @@ class PublisherData final // Internal mutex. mutable std::mutex mutex_; + // The rmw publisher + const rmw_publisher_t * rmw_publisher_; // The parent node. const rmw_node_t * rmw_node_; // The Entity generated for the publisher. diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 769ff47e..68ff9d6e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -40,12 +40,15 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "tracetools/tracetools.h" + namespace rmw_zenoh_cpp { ///============================================================================= std::shared_ptr ServiceData::make( std::shared_ptr session, const rmw_node_t * const node, + const rmw_service_t * rmw_service, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t service_id, @@ -128,6 +131,7 @@ std::shared_ptr ServiceData::make( auto service_data = std::shared_ptr( new ServiceData{ node, + rmw_service, std::move(entity), session, request_members, @@ -192,6 +196,7 @@ std::shared_ptr ServiceData::make( ///============================================================================= ServiceData::ServiceData( const rmw_node_t * rmw_node, + const rmw_service_t * rmw_service, std::shared_ptr entity, std::shared_ptr session, const void * request_type_support_impl, @@ -199,6 +204,7 @@ ServiceData::ServiceData( std::unique_ptr request_type_support, std::unique_ptr response_type_support) : rmw_node_(rmw_node), + rmw_service_(rmw_service), entity_(std::move(entity)), sess_(std::move(session)), request_type_support_impl_(request_type_support_impl), @@ -438,9 +444,9 @@ rmw_ret_t ServiceData::send_response( zenoh::Query::ReplyOptions options = zenoh::Query::ReplyOptions::create_default(); std::array writer_gid; memcpy(writer_gid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE); - options.attachment = create_map_and_set_sequence_num( - request_id->sequence_number, - writer_gid); + int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); + options.attachment = rmw_zenoh_cpp::AttachmentData( + request_id->sequence_number, source_timestamp, writer_gid).serialize_to_zbytes(); std::vector raw_bytes( reinterpret_cast(response_bytes), @@ -454,6 +460,13 @@ rmw_ret_t ServiceData::send_response( return RMW_RET_ERROR; } + TRACETOOLS_TRACEPOINT( + rmw_send_response, + static_cast(rmw_service_), + static_cast(ros_response), + request_id->writer_guid, + request_id->sequence_number, + source_timestamp); loaned_query.reply(service_ke, std::move(payload), std::move(options), &result); if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to reply"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index eeeeee6c..1c557370 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -49,6 +49,7 @@ class ServiceData final : public std::enable_shared_from_this static std::shared_ptr make( std::shared_ptr session, const rmw_node_t * const node, + const rmw_service_t * rmw_service, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t service_id, @@ -98,6 +99,7 @@ class ServiceData final : public std::enable_shared_from_this // Constructor. ServiceData( const rmw_node_t * rmw_node, + const rmw_service_t * rmw_service, std::shared_ptr entity, std::shared_ptr session, const void * request_type_support_impl, @@ -109,6 +111,8 @@ class ServiceData final : public std::enable_shared_from_this mutable std::mutex mutex_; // The parent node. const rmw_node_t * rmw_node_; + // The rmw service. + const rmw_service_t * rmw_service_; // The Entity generated for the service. std::shared_ptr entity_; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 3e6f8ef9..061ceb0f 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -26,18 +26,6 @@ namespace rmw_zenoh_cpp { -///============================================================================= -zenoh::Bytes create_map_and_set_sequence_num( - int64_t sequence_number, std::array gid) -{ - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - int64_t source_timestamp = now_ns.count(); - - rmw_zenoh_cpp::AttachmentData data(sequence_number, source_timestamp, gid); - return data.serialize_to_zbytes(); -} - ///============================================================================= ZenohQuery::ZenohQuery( const zenoh::Query & query, @@ -82,4 +70,11 @@ std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const { return received_timestamp_; } + +int64_t get_system_time_in_ns() +{ + auto now = std::chrono::system_clock::now().time_since_epoch(); + return std::chrono::duration_cast(now).count(); +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 8da7aae4..a88b132e 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -26,10 +26,6 @@ namespace rmw_zenoh_cpp { -///============================================================================= -zenoh::Bytes create_map_and_set_sequence_num( - int64_t sequence_number, std::array gid); - ///============================================================================= // A class to store the replies to service requests. class ZenohReply final @@ -65,6 +61,8 @@ class ZenohQuery final zenoh::Query query_; std::chrono::nanoseconds::rep received_timestamp_; }; + +int64_t get_system_time_in_ns(); } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 23220fdf..e7342c13 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -56,6 +56,8 @@ #include "rmw/validate_namespace.h" #include "rmw/validate_node_name.h" +#include "tracetools/tracetools.h" + namespace { //============================================================================== @@ -454,6 +456,14 @@ rmw_create_publisher( free_topic_name.cancel(); free_rmw_publisher.cancel(); + if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_publisher_init)) { + rmw_gid_t gid{}; + // Trigger tracepoint even if we cannot get the GID + rmw_ret_t gid_ret = rmw_get_gid_for_publisher(rmw_publisher, &gid); + static_cast(gid_ret); + TRACETOOLS_DO_TRACEPOINT( + rmw_publisher_init, static_cast(rmw_publisher), gid.data); + } return rmw_publisher; } @@ -990,6 +1000,12 @@ rmw_create_subscription( free_topic_name.cancel(); free_rmw_subscription.cancel(); + // rmw does not require GIDs for subscriptions, and GIDs in rmw_zenoh are not based on any ID of + // the underlying zenoh objects, so there is no need to collect a GID here + rmw_gid_t gid{}; + static_cast(gid); + TRACETOOLS_TRACEPOINT( + rmw_subscription_init, static_cast(rmw_subscription), gid.data); return rmw_subscription; } @@ -1134,7 +1150,18 @@ rmw_take( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return sub_data->take_one_message(ros_message, nullptr, taken); + if (!TRACETOOLS_TRACEPOINT_ENABLED(rmw_take)) { + return sub_data->take_one_message(ros_message, nullptr, taken); + } + rmw_message_info_t message_info{}; + rmw_ret_t ret = sub_data->take_one_message(ros_message, &message_info, taken); + TRACETOOLS_DO_TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(ros_message), + message_info.source_timestamp, + *taken); + return ret; } //============================================================================== @@ -1163,7 +1190,14 @@ rmw_take_with_info( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return sub_data->take_one_message(ros_message, message_info, taken); + rmw_ret_t ret = sub_data->take_one_message(ros_message, message_info, taken); + TRACETOOLS_TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(ros_message), + message_info->source_timestamp, + *taken); + return ret; } //============================================================================== @@ -1269,11 +1303,18 @@ __rmw_take_serialized( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return sub_data->take_serialized_message( + rmw_ret_t ret = sub_data->take_serialized_message( serialized_message, taken, message_info ); + TRACETOOLS_TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(serialized_message), + message_info->source_timestamp, + *taken); + return ret; } } // namespace @@ -1452,7 +1493,8 @@ rmw_create_client( // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased // Client handle will be returned in the rmw_clients_t in rmw_wait // from which we cannot obtain ClientData. - rmw_client->data = static_cast(node_data->get_client_data(rmw_client).get()); + rmw_zenoh_cpp::ClientDataPtr client_data = node_data->get_client_data(rmw_client); + rmw_client->data = static_cast(client_data.get()); rmw_client->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_client->service_name = rcutils_strdup(service_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( @@ -1467,6 +1509,10 @@ rmw_create_client( free_rmw_client.cancel(); free_service_name.cancel(); + TRACETOOLS_TRACEPOINT( + rmw_client_init, + static_cast(rmw_client), + client_data->copy_gid().data()); return rmw_client; } @@ -1560,8 +1606,17 @@ rmw_take_response( RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); - return client_data->take_response(request_header, ros_response, taken); + rmw_ret_t ret = client_data->take_response(request_header, ros_response, taken); + TRACETOOLS_TRACEPOINT( + rmw_take_response, + static_cast(client), + static_cast(ros_response), + request_header->request_id.sequence_number, + request_header->source_timestamp, + *taken); + return ret; } //============================================================================== @@ -1779,10 +1834,18 @@ rmw_take_request( RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); - return service_data->take_request( + rmw_ret_t ret = service_data->take_request( request_header, ros_request, taken); + TRACETOOLS_TRACEPOINT( + rmw_take_request, + static_cast(service), + static_cast(ros_request), + request_header->request_id.writer_guid, + request_header->request_id.sequence_number, + *taken); + return ret; } //==============================================================================