Skip to content

Commit

Permalink
Add ros2_tracing tracepoints (#120)
Browse files Browse the repository at this point in the history
* Add ros2_tracing tracepoints

Signed-off-by: Christopher Wecht <[email protected]>

* Resolving rebasing issues, several small fixes

Signed-off-by: Christopher Wecht <[email protected]>

* Added rmw_connextdds_get_current_time for faster timestamps

Signed-off-by: Christopher Wecht <[email protected]>

---------

Signed-off-by: Christopher Wecht <[email protected]>
  • Loading branch information
cwecht authored Jan 23, 2024
1 parent a6053be commit 8d34e7e
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 42 deletions.
3 changes: 3 additions & 0 deletions rmw_connextdds_common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ set(RMW_CONNEXT_DEPS
rcpputils
rmw
rmw_dds_common
tracetools
fastcdr
rosidl_runtime_c
rosidl_runtime_cpp
Expand All @@ -128,6 +129,7 @@ endforeach()
################################################################################
# Common Source Configuration
################################################################################

set(RMW_CONNEXT_DIR ${CMAKE_CURRENT_SOURCE_DIR})

set(RMW_CONNEXT_COMMON_SOURCE_CPP
Expand Down Expand Up @@ -172,6 +174,7 @@ set(RMW_CONNEXT_COMMON_SOURCE
${RMW_CONNEXT_COMMON_SOURCE_CPP}
${RMW_CONNEXT_COMMON_SOURCE_HPP})


################################################################################
# Check if additional Connext components are needed (e.g. security)
################################################################################
Expand Down
13 changes: 12 additions & 1 deletion rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,20 @@ enum RMW_Connext_MessageType
RMW_CONNEXT_MESSAGE_REPLY
};

struct RMW_Connext_WriteParams
{
DDS_Time_t timestamp{DDS_TIME_INVALID};
int64_t sequence_number{0};
};

RMW_CONNEXTDDS_PUBLIC extern const char * const RMW_CONNEXTDDS_ID;
extern const char * const RMW_CONNEXTDDS_SERIALIZATION_FORMAT;

rmw_ret_t
rmw_connextdds_get_current_time(
DDS_DomainParticipant * domain_participant,
struct DDS_Time_t * current_time);

rmw_ret_t
rmw_connextdds_set_log_verbosity(rmw_log_severity_t severity);

Expand Down Expand Up @@ -132,7 +143,7 @@ rmw_ret_t
rmw_connextdds_write_message(
RMW_Connext_Publisher * const pub,
RMW_Connext_Message * const message,
int64_t * const sn_out);
RMW_Connext_WriteParams * const params = nullptr);

rmw_ret_t
rmw_connextdds_take_samples(
Expand Down
5 changes: 4 additions & 1 deletion rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
* General helpers and utilities.
******************************************************************************/

#define dds_time_to_u64(t_) \
((1000000000ULL * (uint64_t)(t_)->sec) + (uint64_t)(t_)->nanosec)

rcutils_ret_t
rcutils_uint8_array_copy(
rcutils_uint8_array_t * const dst,
Expand Down Expand Up @@ -151,7 +154,7 @@ class RMW_Connext_Publisher
write(
const void * const ros_message,
const bool serialized,
int64_t * const sn_out = nullptr);
RMW_Connext_WriteParams * const params);

rmw_ret_t
enable() const
Expand Down
1 change: 1 addition & 0 deletions rmw_connextdds_common/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rosidl_typesupport_introspection_c</depend>
<depend>rosidl_typesupport_introspection_cpp</depend>
<depend>tracetools</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
Expand Down
45 changes: 35 additions & 10 deletions rmw_connextdds_common/src/common/rmw_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include "rcpputils/scope_exit.hpp"

#include "tracetools/tracetools.h"

#include "rmw_dds_common/time_utils.hpp"
#include "rmw_dds_common/qos.hpp"

Expand Down Expand Up @@ -919,7 +921,7 @@ rmw_ret_t
RMW_Connext_Publisher::write(
const void * const ros_message,
const bool serialized,
int64_t * const sn_out)
RMW_Connext_WriteParams * const params)
{
RMW_Connext_Message user_msg;
if (RMW_RET_OK != RMW_Connext_Message_initialize(&user_msg, this->type_support, 0)) {
Expand All @@ -928,7 +930,7 @@ RMW_Connext_Publisher::write(
user_msg.user_data = ros_message;
user_msg.serialized = serialized;

return rmw_connextdds_write_message(this, &user_msg, sn_out);
return rmw_connextdds_write_message(this, &user_msg, params);
}


Expand Down Expand Up @@ -1108,6 +1110,7 @@ rmw_connextdds_create_publisher(
}
}

TRACETOOLS_TRACEPOINT(rmw_publisher_init, rmw_publisher, rmw_pub_impl->gid()->data);

scope_exit_rmw_writer_impl_delete.cancel();
scope_exit_rmw_writer_delete.cancel();
Expand Down Expand Up @@ -1393,6 +1396,7 @@ RMW_Connext_Subscriber::create(
RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber")
return nullptr;
}

scope_exit_dds_reader_delete.cancel();
scope_exit_topic_delete.cancel();
scope_exit_type_unregister.cancel();
Expand Down Expand Up @@ -1924,6 +1928,8 @@ rmw_connextdds_create_subscriber(
}
}

TRACETOOLS_TRACEPOINT(rmw_subscription_init, rmw_subscriber, rmw_sub_impl->gid()->data);

#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
scope_exit_enable_participant_on_error.cancel();
#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
Expand Down Expand Up @@ -1960,12 +1966,6 @@ rmw_connextdds_destroy_subscriber(
return RMW_RET_OK;
}

static
constexpr uint64_t C_NANOSECONDS_PER_SEC = 1000000000ULL;

#define dds_time_to_u64(t_) \
((C_NANOSECONDS_PER_SEC * (uint64_t)(t_)->sec) + (uint64_t)(t_)->nanosec)

void
rmw_connextdds_message_info_from_dds(
rmw_message_info_t * const to,
Expand Down Expand Up @@ -2723,7 +2723,21 @@ RMW_Connext_Client::send_request(
reinterpret_cast<const uint32_t *>(rr_msg.gid.data)[3],
rr_msg.sn)

rmw_ret_t rc = this->request_pub->write(&rr_msg, false /* serialized */, sequence_id);

RMW_Connext_WriteParams write_params;

if (DDS_RETCODE_OK !=
rmw_connextdds_get_current_time(
this->request_pub->dds_participant(),
&write_params.timestamp))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to get current time")
return RMW_RET_ERROR;
}

rmw_ret_t rc = this->request_pub->write(&rr_msg, false /* serialized */, &write_params);

*sequence_id = write_params.sequence_number;

RMW_CONNEXT_LOG_DEBUG_A(
"[%s] SENT REQUEST: "
Expand Down Expand Up @@ -2998,6 +3012,17 @@ RMW_Connext_Service::send_response(
rr_msg.gid.implementation_identifier = RMW_CONNEXTDDS_ID;
rr_msg.payload = const_cast<void *>(ros_response);

RMW_Connext_WriteParams write_params;

if (DDS_RETCODE_OK !=
rmw_connextdds_get_current_time(
this->reply_pub->dds_participant(),
&write_params.timestamp))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to get current time")
return RMW_RET_ERROR;
}

RMW_CONNEXT_LOG_DEBUG_A(
"[%s] send RESPONSE: "
"gid=%08X.%08X.%08X.%08X, "
Expand All @@ -3009,7 +3034,7 @@ RMW_Connext_Service::send_response(
reinterpret_cast<const uint32_t *>(rr_msg.gid.data)[3],
rr_msg.sn)

return this->reply_pub->write(&rr_msg, false /* serialized */);
return this->reply_pub->write(&rr_msg, false /* serialized */, &write_params);
}

rmw_ret_t
Expand Down
33 changes: 31 additions & 2 deletions rmw_connextdds_common/src/common/rmw_publication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "rmw/validate_full_topic_name.h"

#include "tracetools/tracetools.h"

/******************************************************************************
* Publication functions
******************************************************************************/
Expand Down Expand Up @@ -47,7 +49,21 @@ rmw_api_connextdds_publish(
auto pub_impl = static_cast<RMW_Connext_Publisher *>(publisher->data);
RMW_CHECK_ARGUMENT_FOR_NULL(pub_impl, RMW_RET_INVALID_ARGUMENT);

return pub_impl->write(ros_message, false /* serialized */);
RMW_Connext_WriteParams write_params;

if (DDS_RETCODE_OK !=
rmw_connextdds_get_current_time(
pub_impl->dds_participant(),
&write_params.timestamp))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to get current time")
return RMW_RET_ERROR;
}

TRACETOOLS_TRACEPOINT(
rmw_publish, publisher, ros_message, dds_time_to_u64(&write_params.timestamp));

return pub_impl->write(ros_message, false /* serialized */, &write_params);
}


Expand All @@ -74,7 +90,20 @@ rmw_api_connextdds_publish_serialized_message(
auto pub_impl = static_cast<RMW_Connext_Publisher *>(publisher->data);
RMW_CHECK_ARGUMENT_FOR_NULL(pub_impl, RMW_RET_INVALID_ARGUMENT);

return pub_impl->write(serialized_message, true /* serialized */);
RMW_Connext_WriteParams write_params;

if (DDS_RETCODE_OK !=
rmw_connextdds_get_current_time(
pub_impl->dds_participant(),
&write_params.timestamp))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to get current time")
return RMW_RET_ERROR;
}

TRACETOOLS_TRACEPOINT(
rmw_publish, publisher, serialized_message, dds_time_to_u64(&write_params.timestamp));
return pub_impl->write(serialized_message, true /* serialized */, &write_params);
}


Expand Down
10 changes: 10 additions & 0 deletions rmw_connextdds_common/src/common/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "rmw/validate_full_topic_name.h"

#include "tracetools/tracetools.h"

/******************************************************************************
* Subscription functions
******************************************************************************/
Expand Down Expand Up @@ -282,6 +284,7 @@ rmw_api_connextdds_take(

rmw_ret_t rc = sub_impl->take_message(ros_message, nullptr, taken);

TRACETOOLS_TRACEPOINT(rmw_take, subscription, ros_message, 0LL, *taken);
return rc;
}

Expand Down Expand Up @@ -311,6 +314,13 @@ rmw_api_connextdds_take_with_info(

rmw_ret_t rc = sub_impl->take_message(ros_message, message_info, taken);

TRACETOOLS_TRACEPOINT(
rmw_take,
subscription,
ros_message,
(message_info ? message_info->source_timestamp : 0LL),
*taken);

return rc;
}

Expand Down
54 changes: 30 additions & 24 deletions rmw_connextdds_common/src/ndds/dds_api_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ struct rmw_connextdds_api_pro

rmw_connextdds_api_pro * RMW_Connext_fv_FactoryContext = nullptr;

rmw_ret_t
rmw_connextdds_get_current_time(
DDS_DomainParticipant * domain_participant,
struct DDS_Time_t * current_time)
{
// Use DDS_DomainParticipant_get_current_time only with Micro since Pro's
// implementation is pretty slow. See #120 for details.
UNUSED_ARG(domain_participant);
RTINtpTime now;
if (!RTIOsapiUtility_getTime(&now)) {
return DDS_RETCODE_ERROR;
}
RTINtpTime_unpackToNanosec(current_time->sec, current_time->nanosec, now);
return DDS_RETCODE_OK;
}

rmw_ret_t
rmw_connextdds_set_log_verbosity(rmw_log_severity_t severity)
{
Expand Down Expand Up @@ -716,15 +732,19 @@ rmw_ret_t
rmw_connextdds_write_message(
RMW_Connext_Publisher * const pub,
RMW_Connext_Message * const message,
int64_t * const sn_out)
RMW_Connext_WriteParams * const params)
{
DDS_WriteParams_t write_params = DDS_WRITEPARAMS_DEFAULT;
if (nullptr != params && !DDS_Time_is_invalid(&params->timestamp)) {
write_params.source_timestamp = params->timestamp;
}

if (pub->message_type_support()->type_requestreply() &&
pub->message_type_support()->ctx()->request_reply_mapping ==
RMW_Connext_RequestReplyMapping::Extended)
{
const RMW_Connext_RequestReplyMessage * const rr_msg =
reinterpret_cast<const RMW_Connext_RequestReplyMessage *>(message->user_data);
DDS_WriteParams_t write_params = DDS_WRITEPARAMS_DEFAULT;

if (!rr_msg->request) {
/* If this is a reply, propagate the request's sample identity
Expand All @@ -745,36 +765,22 @@ rmw_connextdds_write_message(
// enable WriteParams::replace_auto to retrieve SN of published message
write_params.replace_auto = DDS_BOOLEAN_TRUE;
}
if (DDS_RETCODE_OK !=
DDS_DataWriter_write_w_params_untypedI(
pub->writer(), message, &write_params))
{
RMW_CONNEXT_LOG_ERROR_SET(
"failed to write request/reply message to DDS")
return RMW_RET_ERROR;
}

if (rr_msg->request) {
int64_t sn = 0;

// Read assigned sn from write_params
rmw_connextdds_sn_dds_to_ros(
write_params.identity.sequence_number, sn);

*sn_out = sn;
}

return RMW_RET_OK;
}

if (DDS_RETCODE_OK !=
DDS_DataWriter_write_untypedI(
pub->writer(), message, &DDS_HANDLE_NIL))
DDS_DataWriter_write_w_params_untypedI(
pub->writer(), message, &write_params))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to write message to DDS")
return RMW_RET_ERROR;
}

if (nullptr != params && write_params.replace_auto) {
// Read assigned sn from write_params
rmw_connextdds_sn_dds_to_ros(
write_params.identity.sequence_number, params->sequence_number);
}

return RMW_RET_OK;
}

Expand Down
Loading

0 comments on commit 8d34e7e

Please sign in to comment.