diff --git a/rmw_connextdds_common/CMakeLists.txt b/rmw_connextdds_common/CMakeLists.txt index 86ba0671..c8d39703 100644 --- a/rmw_connextdds_common/CMakeLists.txt +++ b/rmw_connextdds_common/CMakeLists.txt @@ -112,6 +112,7 @@ set(RMW_CONNEXT_DEPS rcpputils rmw rmw_dds_common + tracetools fastcdr rosidl_runtime_c rosidl_runtime_cpp @@ -128,6 +129,7 @@ endforeach() ################################################################################ # Common Source Configuration ################################################################################ + set(RMW_CONNEXT_DIR ${CMAKE_CURRENT_SOURCE_DIR}) set(RMW_CONNEXT_COMMON_SOURCE_CPP @@ -172,6 +174,7 @@ set(RMW_CONNEXT_COMMON_SOURCE ${RMW_CONNEXT_COMMON_SOURCE_CPP} ${RMW_CONNEXT_COMMON_SOURCE_HPP}) + ################################################################################ # Check if additional Connext components are needed (e.g. security) ################################################################################ diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index 11b052f5..55a3a0b9 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -46,9 +46,20 @@ enum RMW_Connext_MessageType RMW_CONNEXT_MESSAGE_REPLY }; +struct RMW_Connext_WriteParams +{ + DDS_Time_t timestamp{DDS_TIME_INVALID}; + int64_t sequence_number{0}; +}; + RMW_CONNEXTDDS_PUBLIC extern const char * const RMW_CONNEXTDDS_ID; extern const char * const RMW_CONNEXTDDS_SERIALIZATION_FORMAT; +rmw_ret_t +rmw_connextdds_get_current_time( + DDS_DomainParticipant * domain_participant, + struct DDS_Time_t * current_time); + rmw_ret_t rmw_connextdds_set_log_verbosity(rmw_log_severity_t severity); @@ -132,7 +143,7 @@ rmw_ret_t rmw_connextdds_write_message( RMW_Connext_Publisher * const pub, RMW_Connext_Message * const message, - int64_t * const sn_out); + RMW_Connext_WriteParams * const params = nullptr); rmw_ret_t rmw_connextdds_take_samples( diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 45e15717..56fffdad 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -35,6 +35,9 @@ * General helpers and utilities. ******************************************************************************/ +#define dds_time_to_u64(t_) \ + ((1000000000ULL * (uint64_t)(t_)->sec) + (uint64_t)(t_)->nanosec) + rcutils_ret_t rcutils_uint8_array_copy( rcutils_uint8_array_t * const dst, @@ -151,7 +154,7 @@ class RMW_Connext_Publisher write( const void * const ros_message, const bool serialized, - int64_t * const sn_out = nullptr); + RMW_Connext_WriteParams * const params); rmw_ret_t enable() const diff --git a/rmw_connextdds_common/package.xml b/rmw_connextdds_common/package.xml index 7813cf99..20350dd3 100644 --- a/rmw_connextdds_common/package.xml +++ b/rmw_connextdds_common/package.xml @@ -22,6 +22,7 @@ rosidl_typesupport_fastrtps_cpp rosidl_typesupport_introspection_c rosidl_typesupport_introspection_cpp + tracetools ament_lint_auto ament_lint_common diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index a42032be..bb56a933 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -21,6 +21,8 @@ #include "rcpputils/scope_exit.hpp" +#include "tracetools/tracetools.h" + #include "rmw_dds_common/time_utils.hpp" #include "rmw_dds_common/qos.hpp" @@ -919,7 +921,7 @@ rmw_ret_t RMW_Connext_Publisher::write( const void * const ros_message, const bool serialized, - int64_t * const sn_out) + RMW_Connext_WriteParams * const params) { RMW_Connext_Message user_msg; if (RMW_RET_OK != RMW_Connext_Message_initialize(&user_msg, this->type_support, 0)) { @@ -928,7 +930,7 @@ RMW_Connext_Publisher::write( user_msg.user_data = ros_message; user_msg.serialized = serialized; - return rmw_connextdds_write_message(this, &user_msg, sn_out); + return rmw_connextdds_write_message(this, &user_msg, params); } @@ -1108,6 +1110,7 @@ rmw_connextdds_create_publisher( } } + TRACETOOLS_TRACEPOINT(rmw_publisher_init, rmw_publisher, rmw_pub_impl->gid()->data); scope_exit_rmw_writer_impl_delete.cancel(); scope_exit_rmw_writer_delete.cancel(); @@ -1393,6 +1396,7 @@ RMW_Connext_Subscriber::create( RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") return nullptr; } + scope_exit_dds_reader_delete.cancel(); scope_exit_topic_delete.cancel(); scope_exit_type_unregister.cancel(); @@ -1924,6 +1928,8 @@ rmw_connextdds_create_subscriber( } } + TRACETOOLS_TRACEPOINT(rmw_subscription_init, rmw_subscriber, rmw_sub_impl->gid()->data); + #if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO scope_exit_enable_participant_on_error.cancel(); #endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO @@ -1960,12 +1966,6 @@ rmw_connextdds_destroy_subscriber( return RMW_RET_OK; } -static -constexpr uint64_t C_NANOSECONDS_PER_SEC = 1000000000ULL; - -#define dds_time_to_u64(t_) \ - ((C_NANOSECONDS_PER_SEC * (uint64_t)(t_)->sec) + (uint64_t)(t_)->nanosec) - void rmw_connextdds_message_info_from_dds( rmw_message_info_t * const to, @@ -2723,7 +2723,21 @@ RMW_Connext_Client::send_request( reinterpret_cast(rr_msg.gid.data)[3], rr_msg.sn) - rmw_ret_t rc = this->request_pub->write(&rr_msg, false /* serialized */, sequence_id); + + RMW_Connext_WriteParams write_params; + + if (DDS_RETCODE_OK != + rmw_connextdds_get_current_time( + this->request_pub->dds_participant(), + &write_params.timestamp)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to get current time") + return RMW_RET_ERROR; + } + + rmw_ret_t rc = this->request_pub->write(&rr_msg, false /* serialized */, &write_params); + + *sequence_id = write_params.sequence_number; RMW_CONNEXT_LOG_DEBUG_A( "[%s] SENT REQUEST: " @@ -2998,6 +3012,17 @@ RMW_Connext_Service::send_response( rr_msg.gid.implementation_identifier = RMW_CONNEXTDDS_ID; rr_msg.payload = const_cast(ros_response); + RMW_Connext_WriteParams write_params; + + if (DDS_RETCODE_OK != + rmw_connextdds_get_current_time( + this->reply_pub->dds_participant(), + &write_params.timestamp)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to get current time") + return RMW_RET_ERROR; + } + RMW_CONNEXT_LOG_DEBUG_A( "[%s] send RESPONSE: " "gid=%08X.%08X.%08X.%08X, " @@ -3009,7 +3034,7 @@ RMW_Connext_Service::send_response( reinterpret_cast(rr_msg.gid.data)[3], rr_msg.sn) - return this->reply_pub->write(&rr_msg, false /* serialized */); + return this->reply_pub->write(&rr_msg, false /* serialized */, &write_params); } rmw_ret_t diff --git a/rmw_connextdds_common/src/common/rmw_publication.cpp b/rmw_connextdds_common/src/common/rmw_publication.cpp index 99b977d8..f3a2f56c 100644 --- a/rmw_connextdds_common/src/common/rmw_publication.cpp +++ b/rmw_connextdds_common/src/common/rmw_publication.cpp @@ -19,6 +19,8 @@ #include "rmw/validate_full_topic_name.h" +#include "tracetools/tracetools.h" + /****************************************************************************** * Publication functions ******************************************************************************/ @@ -47,7 +49,21 @@ rmw_api_connextdds_publish( auto pub_impl = static_cast(publisher->data); RMW_CHECK_ARGUMENT_FOR_NULL(pub_impl, RMW_RET_INVALID_ARGUMENT); - return pub_impl->write(ros_message, false /* serialized */); + RMW_Connext_WriteParams write_params; + + if (DDS_RETCODE_OK != + rmw_connextdds_get_current_time( + pub_impl->dds_participant(), + &write_params.timestamp)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to get current time") + return RMW_RET_ERROR; + } + + TRACETOOLS_TRACEPOINT( + rmw_publish, publisher, ros_message, dds_time_to_u64(&write_params.timestamp)); + + return pub_impl->write(ros_message, false /* serialized */, &write_params); } @@ -74,7 +90,20 @@ rmw_api_connextdds_publish_serialized_message( auto pub_impl = static_cast(publisher->data); RMW_CHECK_ARGUMENT_FOR_NULL(pub_impl, RMW_RET_INVALID_ARGUMENT); - return pub_impl->write(serialized_message, true /* serialized */); + RMW_Connext_WriteParams write_params; + + if (DDS_RETCODE_OK != + rmw_connextdds_get_current_time( + pub_impl->dds_participant(), + &write_params.timestamp)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to get current time") + return RMW_RET_ERROR; + } + + TRACETOOLS_TRACEPOINT( + rmw_publish, publisher, serialized_message, dds_time_to_u64(&write_params.timestamp)); + return pub_impl->write(serialized_message, true /* serialized */, &write_params); } diff --git a/rmw_connextdds_common/src/common/rmw_subscription.cpp b/rmw_connextdds_common/src/common/rmw_subscription.cpp index 3ad3625d..8c667ee8 100644 --- a/rmw_connextdds_common/src/common/rmw_subscription.cpp +++ b/rmw_connextdds_common/src/common/rmw_subscription.cpp @@ -19,6 +19,8 @@ #include "rmw/validate_full_topic_name.h" +#include "tracetools/tracetools.h" + /****************************************************************************** * Subscription functions ******************************************************************************/ @@ -282,6 +284,7 @@ rmw_api_connextdds_take( rmw_ret_t rc = sub_impl->take_message(ros_message, nullptr, taken); + TRACETOOLS_TRACEPOINT(rmw_take, subscription, ros_message, 0LL, *taken); return rc; } @@ -311,6 +314,13 @@ rmw_api_connextdds_take_with_info( rmw_ret_t rc = sub_impl->take_message(ros_message, message_info, taken); + TRACETOOLS_TRACEPOINT( + rmw_take, + subscription, + ros_message, + (message_info ? message_info->source_timestamp : 0LL), + *taken); + return rc; } diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index be88c16e..658a1a06 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -36,6 +36,22 @@ struct rmw_connextdds_api_pro rmw_connextdds_api_pro * RMW_Connext_fv_FactoryContext = nullptr; +rmw_ret_t +rmw_connextdds_get_current_time( + DDS_DomainParticipant * domain_participant, + struct DDS_Time_t * current_time) +{ + // Use DDS_DomainParticipant_get_current_time only with Micro since Pro's + // implementation is pretty slow. See #120 for details. + UNUSED_ARG(domain_participant); + RTINtpTime now; + if (!RTIOsapiUtility_getTime(&now)) { + return DDS_RETCODE_ERROR; + } + RTINtpTime_unpackToNanosec(current_time->sec, current_time->nanosec, now); + return DDS_RETCODE_OK; +} + rmw_ret_t rmw_connextdds_set_log_verbosity(rmw_log_severity_t severity) { @@ -716,15 +732,19 @@ rmw_ret_t rmw_connextdds_write_message( RMW_Connext_Publisher * const pub, RMW_Connext_Message * const message, - int64_t * const sn_out) + RMW_Connext_WriteParams * const params) { + DDS_WriteParams_t write_params = DDS_WRITEPARAMS_DEFAULT; + if (nullptr != params && !DDS_Time_is_invalid(¶ms->timestamp)) { + write_params.source_timestamp = params->timestamp; + } + if (pub->message_type_support()->type_requestreply() && pub->message_type_support()->ctx()->request_reply_mapping == RMW_Connext_RequestReplyMapping::Extended) { const RMW_Connext_RequestReplyMessage * const rr_msg = reinterpret_cast(message->user_data); - DDS_WriteParams_t write_params = DDS_WRITEPARAMS_DEFAULT; if (!rr_msg->request) { /* If this is a reply, propagate the request's sample identity @@ -745,36 +765,22 @@ rmw_connextdds_write_message( // enable WriteParams::replace_auto to retrieve SN of published message write_params.replace_auto = DDS_BOOLEAN_TRUE; } - if (DDS_RETCODE_OK != - DDS_DataWriter_write_w_params_untypedI( - pub->writer(), message, &write_params)) - { - RMW_CONNEXT_LOG_ERROR_SET( - "failed to write request/reply message to DDS") - return RMW_RET_ERROR; - } - - if (rr_msg->request) { - int64_t sn = 0; - - // Read assigned sn from write_params - rmw_connextdds_sn_dds_to_ros( - write_params.identity.sequence_number, sn); - - *sn_out = sn; - } - - return RMW_RET_OK; } if (DDS_RETCODE_OK != - DDS_DataWriter_write_untypedI( - pub->writer(), message, &DDS_HANDLE_NIL)) + DDS_DataWriter_write_w_params_untypedI( + pub->writer(), message, &write_params)) { RMW_CONNEXT_LOG_ERROR_SET("failed to write message to DDS") return RMW_RET_ERROR; } + if (nullptr != params && write_params.replace_auto) { + // Read assigned sn from write_params + rmw_connextdds_sn_dds_to_ros( + write_params.identity.sequence_number, params->sequence_number); + } + return RMW_RET_OK; } diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index b5ba5ca3..d0372f9a 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -378,6 +378,16 @@ struct rmw_connextdds_api_micro rmw_connextdds_api_micro * RMW_Connext_fv_FactoryContext = nullptr; +rmw_ret_t +rmw_connextdds_get_current_time( + DDS_DomainParticipant * domain_participant, + struct DDS_Time_t * current_time) +{ + // Use DDS_DomainParticipant_get_current_time only with Micro since Pro's + // implementation is pretty slow. See #120 for details. + return DDS_DomainParticipant_get_current_time(domain_participant, current_time); +} + const char * const RMW_CONNEXTDDS_ID = "rmw_connextddsmicro"; const char * const RMW_CONNEXTDDS_SERIALIZATION_FORMAT = "cdr"; @@ -1153,12 +1163,14 @@ rmw_ret_t rmw_connextdds_write_message( RMW_Connext_Publisher * const pub, RMW_Connext_Message * const message, - int64_t * const sn_out) + RMW_Connext_WriteParams * const params) { - UNUSED_ARG(sn_out); - + DDS_Time_t timestamp = DDS_TIME_INVALID; + if (nullptr != params && !DDS_Time_is_invalid(¶ms->timestamp)) { + timestamp = params->timestamp; + } if (DDS_RETCODE_OK != - DDS_DataWriter_write(pub->writer(), message, &DDS_HANDLE_NIL)) + DDS_DataWriter_write_w_timestamp(pub->writer(), message, &DDS_HANDLE_NIL, ×tamp)) { RMW_CONNEXT_LOG_ERROR_SET("failed to write message to DDS") return RMW_RET_ERROR;