From 224d761ecaf7fe4a69a3311d8dce9e8eb599407f Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Fri, 24 Apr 2020 11:49:15 -0500 Subject: [PATCH] Add support for taking a sequence of messages (#148) * Add support for taking a sequence of messages Signed-off-by: Michael Carroll * Reorder valid messages to front of sequence Signed-off-by: Michael Carroll * Initialize taken value Signed-off-by: Michael Carroll --- rmw_cyclonedds_cpp/src/rmw_node.cpp | 84 +++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 8c810e06..ab2d4438 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -2163,6 +2163,80 @@ static rmw_ret_t rmw_take_int( return RMW_RET_OK; } +static rmw_ret_t rmw_take_seq( + const rmw_subscription_t * subscription, + size_t count, + rmw_message_sequence_t * message_sequence, + rmw_message_info_sequence_t * message_info_sequence, + size_t * taken) +{ + RET_NULL(taken); + RET_NULL(message_sequence); + RET_NULL(message_info_sequence); + RET_WRONG_IMPLID(subscription); + + if (count > message_sequence->capacity) { + RMW_SET_ERROR_MSG("Insuffient capacity in message_sequence"); + return RMW_RET_ERROR; + } + + if (count > message_info_sequence->capacity) { + RMW_SET_ERROR_MSG("Insuffient capacity in message_info_sequence"); + return RMW_RET_ERROR; + } + + CddsSubscription * sub = static_cast(subscription->data); + RET_NULL(sub); + + std::vector infos(count); + auto ret = dds_take(sub->enth, message_sequence->data, infos.data(), count, count); + + // Returning 0 should not be an error, as it just indicates that no messages were available. + if (ret < 0) { + return RMW_RET_ERROR; + } + + // Keep track of taken/not taken to reorder sequence with valid messages at the front + std::vector taken_msg; + std::vector not_taken_msg; + *taken = 0u; + + for (int ii = 0; ii < ret; ++ii) { + const dds_sample_info_t & info = infos[ii]; + + void * message = &message_sequence->data[ii]; + rmw_message_info_t * message_info = &message_info_sequence->data[*taken]; + + if (info.valid_data) { + taken_msg.push_back(message); + (*taken)++; + if (message_info) { + message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier; + memset(message_info->publisher_gid.data, 0, sizeof(message_info->publisher_gid.data)); + assert(sizeof(info.publication_handle) <= sizeof(message_info->publisher_gid.data)); + memcpy( + message_info->publisher_gid.data, &info.publication_handle, + sizeof(info.publication_handle)); + } + } else { + not_taken_msg.push_back(message); + } + } + + for (size_t ii = 0; ii < taken_msg.size(); ++ii) { + message_sequence->data[ii] = taken_msg[ii]; + } + + for (size_t ii = 0; ii < not_taken_msg.size(); ++ii) { + message_sequence->data[ii + taken_msg.size()] = not_taken_msg[ii]; + } + + message_sequence->size = *taken; + message_info_sequence->size = *taken; + + return RMW_RET_OK; +} + static rmw_ret_t rmw_take_ser_int( const rmw_subscription_t * subscription, rmw_serialized_message_t * serialized_message, bool * taken, @@ -2221,6 +2295,16 @@ extern "C" rmw_ret_t rmw_take_with_info( return rmw_take_int(subscription, ros_message, taken, message_info); } +extern "C" rmw_ret_t rmw_take_sequence( + const rmw_subscription_t * subscription, size_t count, + rmw_message_sequence_t * message_sequence, + rmw_message_info_sequence_t * message_info_sequence, + size_t * taken, rmw_subscription_allocation_t * allocation) +{ + static_cast(allocation); + return rmw_take_seq(subscription, count, message_sequence, message_info_sequence, taken); +} + extern "C" rmw_ret_t rmw_take_serialized_message( const rmw_subscription_t * subscription, rmw_serialized_message_t * serialized_message,