diff --git a/rmw_zenoh_cpp/src/detail/buffer_pool.hpp b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp new file mode 100644 index 00000000..5cf38b32 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp @@ -0,0 +1,74 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_POOL_HPP_ +#define DETAIL__BUFFER_POOL_HPP_ + +#include +#include +#include + +#include "rcutils/allocator.h" + +// FIXME(fuzzypixelz): indeed, we leak all allocated buffers ;) +class BufferPool +{ +public: + struct Buffer + { + uint8_t *data; + size_t size; + }; + + BufferPool() = default; + + Buffer allocate(rcutils_allocator_t *allocator, size_t size) + { + std::lock_guard guard(mutex_); + + if (buffers_.empty()) { + uint8_t *data = static_cast(allocator->allocate(size, allocator->state)); + if (data == nullptr) { + return {}; + } + return Buffer {data, size}; + } else { + Buffer buffer = buffers_.back(); + buffers_.pop_back(); + if (buffer.size < size) { + uint8_t *data = static_cast(allocator->reallocate( + buffer.data, size, allocator->state)); + if (data == nullptr) { + return {}; + } + buffer.data = data; + buffer.size = size; + } + return buffer; + } + } + + void + deallocate(Buffer buffer) + { + std::lock_guard guard(mutex_); + buffers_.push_back(buffer); + } + +private: + std::vector buffers_; + std::mutex mutex_; +}; + +#endif // DETAIL__BUFFER_POOL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index a2fdaf5e..17932867 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -24,6 +24,7 @@ #include "graph_cache.hpp" #include "rmw_node_data.hpp" +#include "buffer_pool.hpp" #include "rmw/ret_types.h" #include "rmw/types.h" @@ -92,6 +93,9 @@ struct rmw_context_impl_s final // Forward declaration class Data; + // Pool of serialization buffers. + BufferPool serialization_buffer_pool; + private: std::shared_ptr data_{nullptr}; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 7d83eec6..c9e74c4c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -42,72 +42,72 @@ namespace rmw_zenoh_cpp // TODO(yuyuan): SHM, make this configurable #define SHM_BUF_OK_SIZE 2621440 -///============================================================================= + ///============================================================================= std::shared_ptr PublisherData::make( std::shared_ptr session, - const rmw_node_t * const node, + const rmw_node_t *const node, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t publisher_id, const std::string & topic_name, - const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile) + const rosidl_message_type_support_t *type_support, + const rmw_qos_profile_t *qos_profile) { rmw_qos_profile_t adapted_qos_profile = *qos_profile; rmw_ret_t ret = QoS::get().best_available_qos( - node, topic_name.c_str(), &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); + node, topic_name.c_str(), &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); if (RMW_RET_OK != ret) { return nullptr; } - rcutils_allocator_t * allocator = &node->context->options.allocator; + rcutils_allocator_t *allocator = &node->context->options.allocator; - const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + const rosidl_type_hash_t *type_hash = type_support->get_type_hash_func(type_support); auto callbacks = static_cast(type_support->data); auto message_type_support = std::make_unique(callbacks); - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; + // Convert the type hash to a string so that it can be included in + // the keyexpr. + char *type_hash_c_str = nullptr; rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - type_hash, - *allocator, - &type_hash_c_str); + type_hash, + *allocator, + &type_hash_c_str); if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); return nullptr; } auto always_free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { + [&allocator, &type_hash_c_str]() + { allocator->deallocate(type_hash_c_str, allocator->state); - }); + }); std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - session->get_zid(), - std::to_string(node_id), - std::to_string(publisher_id), - liveliness::EntityType::Publisher, - std::move(node_info), - liveliness::TopicInfo{ + session->get_zid(), + std::to_string(node_id), + std::to_string(publisher_id), + liveliness::EntityType::Publisher, + std::move(node_info), + liveliness::TopicInfo{ std::move(domain_id), topic_name, message_type_support->get_name(), type_hash_c_str, - adapted_qos_profile} - ); + adapted_qos_profile}); if (entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the publisher %s.", - topic_name.c_str()); + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the publisher %s.", + topic_name.c_str()); return nullptr; } zenoh::ZResult result; std::optional pub_cache; zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); - // Create a Publication Cache if durability is transient_local. + // Create a Publication Cache if durability is transient_local. if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { zenoh::ext::SessionExt::PublicationCacheOptions pub_cache_opts = zenoh::ext::SessionExt::PublicationCacheOptions::create_default(); @@ -127,7 +127,7 @@ std::shared_ptr PublisherData::make( } } - // Set congestion_control to BLOCK if appropriate. + // Set congestion_control to BLOCK if appropriate. zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default(); opts.congestion_control = Z_CONGESTION_CONTROL_DROP; if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { @@ -146,18 +146,18 @@ std::shared_ptr PublisherData::make( std::string liveliness_keyexpr = entity->liveliness_keyexpr(); auto token = session->liveliness_declare_token( - zenoh::KeyExpr(liveliness_keyexpr), - zenoh::Session::LivelinessDeclarationOptions::create_default(), - &result); + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the publisher."); + "rmw_zenoh_cpp", + "Unable to create liveliness token for the publisher."); return nullptr; } return std::shared_ptr( - new PublisherData{ + new PublisherData{ node, std::move(entity), std::move(session), @@ -165,19 +165,18 @@ std::shared_ptr PublisherData::make( std::move(pub_cache), std::move(token), type_support->data, - std::move(message_type_support) - }); + std::move(message_type_support)}); } -///============================================================================= + ///============================================================================= PublisherData::PublisherData( - const rmw_node_t * rmw_node, + const rmw_node_t *rmw_node, std::shared_ptr entity, std::shared_ptr sess, zenoh::Publisher pub, std::optional pub_cache, zenoh::LivelinessToken token, - const void * type_support_impl, + const void *type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), @@ -193,9 +192,9 @@ PublisherData::PublisherData( events_mgr_ = std::make_shared(); } -///============================================================================= + ///============================================================================= rmw_ret_t PublisherData::publish( - const void * ros_message, + const void *ros_message, std::optional & /*shm_provider*/) { std::lock_guard lock(mutex_); @@ -204,37 +203,34 @@ rmw_ret_t PublisherData::publish( return RMW_RET_ERROR; } - // Serialize data. + // Serialize data. size_t max_data_length = type_support_->get_estimated_serialized_size( - ros_message, - type_support_impl_); + ros_message, + type_support_impl_); - // To store serialized message byte array. - char * msg_bytes = nullptr; + // To store serialized message byte array. + uint8_t *msg_bytes = nullptr; - rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + rcutils_allocator_t *allocator = &rmw_node_->context->options.allocator; - auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator]() { - if (msg_bytes) { - allocator->deallocate(msg_bytes, allocator->state); - } - }); + rmw_context_impl_s *context_impl = static_cast(rmw_node_->data); - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + // Get memory from the serialization buffer pool. + BufferPool::Buffer serialization_buffer = + context_impl->serialization_buffer_pool.allocate(allocator, max_data_length); + msg_bytes = serialization_buffer.data; RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); + msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); - // Object that serializes the data + // Object that serializes the data rmw_zenoh_cpp::Cdr ser(fastbuffer); if (!type_support_->serialize_ros_message( - ros_message, - ser.get_cdr(), - type_support_impl_)) + ros_message, + ser.get_cdr(), + type_support_impl_)) { RMW_SET_ERROR_MSG("could not serialize ROS message"); return RMW_RET_ERROR; @@ -242,27 +238,30 @@ rmw_ret_t PublisherData::publish( const size_t data_length = ser.get_serialized_data_length(); - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. zenoh::ZResult result; auto options = zenoh::Publisher::PutOptions::create_default(); options.attachment = create_map_and_set_sequence_num( - sequence_number_++, - entity_->copy_gid()); + sequence_number_++, + entity_->copy_gid()); + + auto delete_bytes = [buffer_pool = &context_impl->serialization_buffer_pool, + buffer = serialization_buffer](uint8_t * data){ + assert(buffer.data == data); + buffer_pool->deallocate(buffer); + }; // TODO(ahcorde): shmbuf - std::vector raw_data( - reinterpret_cast(msg_bytes), - reinterpret_cast(msg_bytes) + data_length); - zenoh::Bytes payload(std::move(raw_data)); + zenoh::Bytes payload(msg_bytes, data_length, delete_bytes); pub_.put(std::move(payload), std::move(options), &result); if (result != Z_OK) { if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "unable to publish message since the zenoh session is closed"); + "rmw_zenoh_cpp", + "unable to publish message since the zenoh session is closed"); } else { RMW_SET_ERROR_MSG("unable to publish message"); return RMW_RET_ERROR; @@ -272,9 +271,9 @@ rmw_ret_t PublisherData::publish( return RMW_RET_OK; } -///============================================================================= + ///============================================================================= rmw_ret_t PublisherData::publish_serialized_message( - const rmw_serialized_message_t * serialized_message, + const rmw_serialized_message_t *serialized_message, std::optional & /*shm_provider*/) { eprosima::fastcdr::FastBuffer buffer( @@ -288,9 +287,9 @@ rmw_ret_t PublisherData::publish_serialized_message( std::lock_guard lock(mutex_); const size_t data_length = ser.get_serialized_data_length(); - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. zenoh::ZResult result; auto options = zenoh::Publisher::PutOptions::create_default(); options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid()); @@ -304,8 +303,8 @@ rmw_ret_t PublisherData::publish_serialized_message( if (result != Z_OK) { if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "unable to publish message since the zenoh session is closed"); + "rmw_zenoh_cpp", + "unable to publish message since the zenoh session is closed"); } else { RMW_SET_ERROR_MSG("unable to publish message"); return RMW_RET_ERROR; @@ -314,14 +313,14 @@ rmw_ret_t PublisherData::publish_serialized_message( return RMW_RET_OK; } -///============================================================================= + ///============================================================================= std::size_t PublisherData::keyexpr_hash() const { std::lock_guard lock(mutex_); return entity_->keyexpr_hash(); } -///============================================================================= + ///============================================================================= liveliness::TopicInfo PublisherData::topic_info() const { std::lock_guard lock(mutex_); @@ -334,37 +333,36 @@ std::array PublisherData::copy_gid() const return entity_->copy_gid(); } -///============================================================================= + ///============================================================================= bool PublisherData::liveliness_is_valid() const { std::lock_guard lock(mutex_); - // The z_check function is now internal in zenoh-1.0.0 so we assume - // the liveliness token is still initialized as long as this entity has - // not been shutdown. + // The z_check function is now internal in zenoh-1.0.0 so we assume + // the liveliness token is still initialized as long as this entity has + // not been shutdown. return !is_shutdown_; } -///============================================================================= + ///============================================================================= std::shared_ptr PublisherData::events_mgr() const { std::lock_guard lock(mutex_); return events_mgr_; } -///============================================================================= + ///============================================================================= PublisherData::~PublisherData() { const rmw_ret_t ret = this->shutdown(); if (ret != RMW_RET_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Error destructing publisher /%s.", - entity_->topic_info().value().name_.c_str() - ); + "rmw_zenoh_cpp", + "Error destructing publisher /%s.", + entity_->topic_info().value().name_.c_str()); } } -///============================================================================= + ///============================================================================= rmw_ret_t PublisherData::shutdown() { std::lock_guard lock(mutex_); @@ -372,20 +370,20 @@ rmw_ret_t PublisherData::shutdown() return RMW_RET_OK; } - // Unregister this publisher from the ROS graph. + // Unregister this publisher from the ROS graph. zenoh::ZResult result; std::move(token_).value().undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to undeclare liveliness token"); + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); return RMW_RET_ERROR; } std::move(pub_).undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to undeclare publisher"); + "rmw_zenoh_cpp", + "Unable to undeclare publisher"); return RMW_RET_ERROR; } @@ -394,7 +392,7 @@ rmw_ret_t PublisherData::shutdown() return RMW_RET_OK; } -///============================================================================= + ///============================================================================= bool PublisherData::is_shutdown() const { std::lock_guard lock(mutex_); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 5acbd909..9a49f11d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -42,7 +42,7 @@ namespace rmw_zenoh_cpp { -///============================================================================= + ///============================================================================= SubscriptionData::Message::Message( std::vector && p, uint64_t recv_ts, @@ -51,100 +51,99 @@ SubscriptionData::Message::Message( { } -///============================================================================= + ///============================================================================= SubscriptionData::Message::~Message() { } -///============================================================================= + ///============================================================================= std::shared_ptr SubscriptionData::make( std::shared_ptr session, std::shared_ptr graph_cache, - const rmw_node_t * const node, + const rmw_node_t *const node, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t subscription_id, const std::string & topic_name, - const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile) + const rosidl_message_type_support_t *type_support, + const rmw_qos_profile_t *qos_profile) { rmw_qos_profile_t adapted_qos_profile = *qos_profile; rmw_ret_t ret = QoS::get().best_available_qos( - node, topic_name.c_str(), &adapted_qos_profile, rmw_get_publishers_info_by_topic); + node, topic_name.c_str(), &adapted_qos_profile, rmw_get_publishers_info_by_topic); if (RMW_RET_OK != ret) { return nullptr; } - rcutils_allocator_t * allocator = &node->context->options.allocator; + rcutils_allocator_t *allocator = &node->context->options.allocator; - const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + const rosidl_type_hash_t *type_hash = type_support->get_type_hash_func(type_support); auto callbacks = static_cast(type_support->data); auto message_type_support = std::make_unique(callbacks); - // Convert the type hash to a string so that it can be included in the keyexpr. - char * type_hash_c_str = nullptr; + // Convert the type hash to a string so that it can be included in the keyexpr. + char *type_hash_c_str = nullptr; rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - type_hash, - *allocator, - &type_hash_c_str); + type_hash, + *allocator, + &type_hash_c_str); if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); return nullptr; } auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { + [&allocator, &type_hash_c_str]() + { allocator->deallocate(type_hash_c_str, allocator->state); - }); + }); - // Everything above succeeded and is setup properly. Now declare a subscriber - // with Zenoh; after this, callbacks may come in at any time. + // Everything above succeeded and is setup properly. Now declare a subscriber + // with Zenoh; after this, callbacks may come in at any time. std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - session->get_zid(), - std::to_string(node_id), - std::to_string(subscription_id), - liveliness::EntityType::Subscription, - std::move(node_info), - liveliness::TopicInfo{ + session->get_zid(), + std::to_string(node_id), + std::to_string(subscription_id), + liveliness::EntityType::Subscription, + std::move(node_info), + liveliness::TopicInfo{ std::move(domain_id), topic_name, message_type_support->get_name(), type_hash_c_str, - adapted_qos_profile} - ); + adapted_qos_profile}); if (entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the subscription %s.", - topic_name.c_str()); + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the subscription %s.", + topic_name.c_str()); return nullptr; } auto sub_data = std::shared_ptr( - new SubscriptionData{ + new SubscriptionData{ node, graph_cache, std::move(entity), std::move(session), type_support->data, - std::move(message_type_support) - }); + std::move(message_type_support)}); if (!sub_data->init()) { - // init() already set the error + // init() already set the error return nullptr; } return sub_data; } -///============================================================================= + ///============================================================================= SubscriptionData::SubscriptionData( - const rmw_node_t * rmw_node, + const rmw_node_t *rmw_node, std::shared_ptr graph_cache, std::shared_ptr entity, std::shared_ptr session, - const void * type_support_impl, + const void *type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), @@ -161,8 +160,8 @@ SubscriptionData::SubscriptionData( events_mgr_ = std::make_shared(); } -// We have to use an "init" function here, rather than do this in the constructor, because we use -// enable_shared_from_this, which is not available in constructors. + // We have to use an "init" function here, rather than do this in the constructor, because we use + // enable_shared_from_this, which is not available in constructors. bool SubscriptionData::init() { zenoh::ZResult result; @@ -172,30 +171,30 @@ bool SubscriptionData::init() return false; } - rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); + rmw_context_impl_t *context_impl = static_cast(rmw_node_->context->impl); sess_ = context_impl->session(); - // Instantiate the subscription with suitable options depending on the - // adapted_qos_profile. - // TODO(Yadunund): Rely on a separate function to return the sub - // as we start supporting more qos settings. + // Instantiate the subscription with suitable options depending on the + // adapted_qos_profile. + // TODO(Yadunund): Rely on a separate function to return the sub + // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { zenoh::ext::SessionExt::QueryingSubscriberOptions sub_options = zenoh::ext::SessionExt::QueryingSubscriberOptions::create_default(); const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_; zenoh::KeyExpr selector_ke(selector); sub_options.query_keyexpr = std::move(selector_ke); - // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. - // By default a query accepts only replies that matches its query selector. - // This allows us to selectively query certain PublicationCaches when defining the - // set_querying_subscriber_callback below. + // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. + // By default a query accepts only replies that matches its query selector. + // This allows us to selectively query certain PublicationCaches when defining the + // set_querying_subscriber_callback below. sub_options.query_accept_replies = ZC_REPLY_KEYEXPR_ANY; - // As this initial query is now using a "*", the query target is not COMPLETE. + // As this initial query is now using a "*", the query target is not COMPLETE. sub_options.query_target = Z_QUERY_TARGET_ALL; - // We set consolidation to none as we need to receive transient local messages - // from a number of publishers. Eg: To receive TF data published over /tf_static - // by various publishers. + // We set consolidation to none as we need to receive transient local messages + // from a number of publishers. Eg: To receive TF data published over /tf_static + // by various publishers. sub_options.query_consolidation = zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); @@ -206,17 +205,17 @@ bool SubscriptionData::init() auto sub_data = data_wp.lock(); if (sub_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain SubscriptionData from data for %s.", - sample.get_keyexpr().as_string_view()); + "rmw_zenoh_cpp", + "Unable to obtain SubscriptionData from data for %s.", + sample.get_keyexpr().as_string_view()); return; } auto attachment = sample.get_attachment(); if (!attachment.has_value()) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment") + "rmw_zenoh_cpp", + "Unable to obtain attachment") return; } @@ -224,34 +223,33 @@ bool SubscriptionData::init() AttachmentData attachment_data(attachment_value); sub_data->add_new_message( - std::make_unique( - sample.get_payload().as_vector(), - std::chrono::system_clock::now().time_since_epoch().count(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }, - zenoh::closures::none, - std::move(sub_options), - &result); + std::make_unique( + sample.get_payload().as_vector(), + std::chrono::system_clock::now().time_since_epoch().count(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &result); if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } sub_ = std::move(sub); - // Register the querying subscriber with the graph cache to get latest - // messages from publishers that were discovered after their first publication. + // Register the querying subscriber with the graph cache to get latest + // messages from publishers that were discovered after their first publication. graph_cache_->set_querying_subscriber_callback( - entity_->topic_info().value().topic_keyexpr_, - entity_->keyexpr_hash(), + entity_->topic_info().value().topic_keyexpr_, + entity_->keyexpr_hash(), [data_wp](const std::string & queryable_prefix) -> void { auto sub_data = data_wp.lock(); if (sub_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to lock weak_ptr within querying subscription callback." - ); + "rmw_zenoh_cpp", + "Unable to lock weak_ptr within querying subscription callback."); return; } std::lock_guard lock(sub_data->mutex_); @@ -260,47 +258,43 @@ bool SubscriptionData::init() "/" + sub_data->entity_->topic_info().value().topic_keyexpr_; RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "QueryingSubscriberCallback triggered over %s.", - selector.c_str() - ); + "rmw_zenoh_cpp", + "QueryingSubscriberCallback triggered over %s.", + selector.c_str()); zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); opts.timeout_ms = std::numeric_limits::max(); opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; zenoh::ZResult result; - std::get>(sub_data->sub_.value()).get( - zenoh::KeyExpr(selector), - std::move(opts), - &result); + std::get>(sub_data->sub_.value()).get(zenoh::KeyExpr( + selector), std::move(opts), &result); if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to get querying subscriber."); return; } - } - ); + }); } else { zenoh::Session::SubscriberOptions sub_options = zenoh::Session::SubscriberOptions::create_default(); std::weak_ptr data_wp = shared_from_this(); zenoh::Subscriber sub = context_impl->session()->declare_subscriber( - sub_ke, - [data_wp](const zenoh::Sample & sample) { + sub_ke, + [data_wp](const zenoh::Sample & sample) + { auto sub_data = data_wp.lock(); if (sub_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to lock weak_ptr within querying subscription callback." - ); + "rmw_zenoh_cpp", + "Unable to lock weak_ptr within querying subscription callback."); return; } auto attachment = sample.get_attachment(); if (!attachment.has_value()) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment") + "rmw_zenoh_cpp", + "Unable to obtain attachment") return; } auto payload = sample.get_payload().clone(); @@ -308,15 +302,15 @@ bool SubscriptionData::init() AttachmentData attachment_data(attachment_value); sub_data->add_new_message( - std::make_unique( - payload.as_vector(), - std::chrono::system_clock::now().time_since_epoch().count(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }, - zenoh::closures::none, - std::move(sub_options), - &result); + std::make_unique( + payload.as_vector(), + std::chrono::system_clock::now().time_since_epoch().count(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &result); if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; @@ -324,16 +318,16 @@ bool SubscriptionData::init() sub_ = std::move(sub); } - // Publish to the graph that a new subscription is in town. + // Publish to the graph that a new subscription is in town. std::string liveliness_keyexpr = entity_->liveliness_keyexpr(); token_ = context_impl->session()->liveliness_declare_token( - zenoh::KeyExpr(liveliness_keyexpr), - zenoh::Session::LivelinessDeclarationOptions::create_default(), - &result); + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the subscription."); + "rmw_zenoh_cpp", + "Unable to create liveliness token for the subscription."); return false; } @@ -342,51 +336,50 @@ bool SubscriptionData::init() return true; } -///============================================================================= + ///============================================================================= std::size_t SubscriptionData::keyexpr_hash() const { std::lock_guard lock(mutex_); return entity_->keyexpr_hash(); } -///============================================================================= + ///============================================================================= liveliness::TopicInfo SubscriptionData::topic_info() const { std::lock_guard lock(mutex_); return entity_->topic_info().value(); } -///============================================================================= + ///============================================================================= bool SubscriptionData::liveliness_is_valid() const { std::lock_guard lock(mutex_); - // The z_check function is now internal in zenoh-1.0.0 so we assume - // the liveliness token is still initialized as long as this entity has - // not been shutdown. + // The z_check function is now internal in zenoh-1.0.0 so we assume + // the liveliness token is still initialized as long as this entity has + // not been shutdown. return !is_shutdown_; } -///============================================================================= + ///============================================================================= std::shared_ptr SubscriptionData::events_mgr() const { std::lock_guard lock(mutex_); return events_mgr_; } -///============================================================================= + ///============================================================================= SubscriptionData::~SubscriptionData() { const rmw_ret_t ret = this->shutdown(); if (ret != RMW_RET_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Error destructing publisher /%s.", - entity_->topic_info().value().name_.c_str() - ); + "rmw_zenoh_cpp", + "Error destructing publisher /%s.", + entity_->topic_info().value().name_.c_str()); } } -///============================================================================= + ///============================================================================= rmw_ret_t SubscriptionData::shutdown() { rmw_ret_t ret = RMW_RET_OK; @@ -395,43 +388,42 @@ rmw_ret_t SubscriptionData::shutdown() return ret; } - // Remove the registered callback from the GraphCache if any. + // Remove the registered callback from the GraphCache if any. graph_cache_->remove_querying_subscriber_callback( - entity_->topic_info().value().topic_keyexpr_, - entity_->keyexpr_hash() - ); - // Remove any event callbacks registered to this subscription. + entity_->topic_info().value().topic_keyexpr_, + entity_->keyexpr_hash()); + // Remove any event callbacks registered to this subscription. graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); - // Unregister this subscription from the ROS graph. + // Unregister this subscription from the ROS graph. zenoh::ZResult result; std::move(token_).value().undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to undeclare liveliness token"); + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); return RMW_RET_ERROR; } if (sub_.has_value()) { - zenoh::Subscriber * sub = std::get_if>(&sub_.value()); + zenoh::Subscriber *sub = std::get_if>(&sub_.value()); if (sub != nullptr) { std::move(*sub).undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to undeclare sub."); + "rmw_zenoh_cpp", + "failed to undeclare sub."); return RMW_RET_ERROR; } } else { - zenoh::ext::QueryingSubscriber * sub = + zenoh::ext::QueryingSubscriber *sub = std::get_if>(&sub_.value()); if (sub != nullptr) { std::move(*sub).undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to undeclare querying sub."); + "rmw_zenoh_cpp", + "failed to undeclare querying sub."); return RMW_RET_ERROR; } } @@ -444,16 +436,16 @@ rmw_ret_t SubscriptionData::shutdown() return ret; } -///============================================================================= + ///============================================================================= bool SubscriptionData::is_shutdown() const { std::lock_guard lock(mutex_); return is_shutdown_; } -///============================================================================= + ///============================================================================= bool SubscriptionData::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) + rmw_wait_set_data_t *wait_set_data) { std::lock_guard lock(mutex_); if (!message_queue_.empty()) { @@ -465,7 +457,7 @@ bool SubscriptionData::queue_has_data_and_attach_condition_if_not( return false; } -///============================================================================= + ///============================================================================= bool SubscriptionData::detach_condition_and_queue_is_empty() { std::lock_guard lock(mutex_); @@ -474,17 +466,17 @@ bool SubscriptionData::detach_condition_and_queue_is_empty() return message_queue_.empty(); } -///============================================================================= + ///============================================================================= rmw_ret_t SubscriptionData::take_one_message( - void * ros_message, - rmw_message_info_t * message_info, - bool * taken) + void *ros_message, + rmw_message_info_t *message_info, + bool *taken) { *taken = false; std::lock_guard lock(mutex_); if (is_shutdown_ || message_queue_.empty()) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. + // This tells rcl that the check for a new message was done, but no messages have come in yet. return RMW_RET_OK; } std::unique_ptr msg_data = std::move(message_queue_.front()); @@ -494,21 +486,21 @@ rmw_ret_t SubscriptionData::take_one_message( if (payload_data.empty()) { RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "SubscriptionData not able to get slice data"); + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); return RMW_RET_ERROR; } - // Object that manages the raw buffer + // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( reinterpret_cast(const_cast(payload_data.data())), payload_data.size()); - // Object that serializes the data + // Object that serializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); if (!type_support_->deserialize_ros_message( - deser.get_cdr(), - ros_message, - type_support_impl_)) + deser.get_cdr(), + ros_message, + type_support_impl_)) { RMW_SET_ERROR_MSG("could not deserialize ROS message"); return RMW_RET_ERROR; @@ -518,13 +510,13 @@ rmw_ret_t SubscriptionData::take_one_message( message_info->source_timestamp = msg_data->attachment.source_timestamp(); message_info->received_timestamp = msg_data->recv_timestamp; message_info->publication_sequence_number = msg_data->attachment.sequence_number(); - // TODO(clalancette): fill in reception_sequence_number + // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; memcpy( - message_info->publisher_gid.data, - msg_data->attachment.copy_gid().data(), - RMW_GID_STORAGE_SIZE); + message_info->publisher_gid.data, + msg_data->attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } *taken = true; @@ -532,17 +524,17 @@ rmw_ret_t SubscriptionData::take_one_message( return RMW_RET_OK; } -///============================================================================= + ///============================================================================= rmw_ret_t SubscriptionData::take_serialized_message( - rmw_serialized_message_t * serialized_message, - bool * taken, - rmw_message_info_t * message_info) + rmw_serialized_message_t *serialized_message, + bool *taken, + rmw_message_info_t *message_info) { *taken = false; std::lock_guard lock(mutex_); if (is_shutdown_ || message_queue_.empty()) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. + // This tells rcl that the check for a new message was done, but no messages have come in yet. return RMW_RET_OK; } std::unique_ptr msg_data = std::move(message_queue_.front()); @@ -552,22 +544,22 @@ rmw_ret_t SubscriptionData::take_serialized_message( if (payload_data.empty()) { RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "SubscriptionData not able to get slice data"); + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); return RMW_RET_ERROR; } if (serialized_message->buffer_capacity < payload_data.size()) { rmw_ret_t ret = rmw_serialized_message_resize(serialized_message, payload_data.size()); if (ret != RMW_RET_OK) { - return ret; // Error message already set + return ret; // Error message already set } } serialized_message->buffer_length = payload_data.size(); memcpy( - serialized_message->buffer, - reinterpret_cast(const_cast(payload_data.data())), - payload_data.size()); + serialized_message->buffer, + reinterpret_cast(const_cast(payload_data.data())), + payload_data.size()); *taken = true; @@ -575,20 +567,20 @@ rmw_ret_t SubscriptionData::take_serialized_message( message_info->source_timestamp = msg_data->attachment.source_timestamp(); message_info->received_timestamp = msg_data->recv_timestamp; message_info->publication_sequence_number = msg_data->attachment.sequence_number(); - // TODO(clalancette): fill in reception_sequence_number + // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; memcpy( - message_info->publisher_gid.data, - msg_data->attachment.copy_gid().data(), - RMW_GID_STORAGE_SIZE); + message_info->publisher_gid.data, + msg_data->attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } return RMW_RET_OK; } -///============================================================================= + ///============================================================================= void SubscriptionData::add_new_message( std::unique_ptr msg, const std::string & topic_name) { @@ -600,31 +592,31 @@ void SubscriptionData::add_new_message( if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && message_queue_.size() >= adapted_qos_profile.depth) { - // Log warning if message is discarded due to hitting the queue depth + // Log warning if message is discarded due to hitting the queue depth RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "Message queue depth of %ld reached, discarding oldest message " - "for subscription for %s", - adapted_qos_profile.depth, - topic_name.c_str()); - - // If the adapted_qos_profile.depth is 0, the std::move command below will result - // in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1 - // in rmw_create_subscription() but to be safe, we only attempt to discard from the - // queue if it is non-empty. + "rmw_zenoh_cpp", + "Message queue depth of %ld reached, discarding oldest message " + "for subscription for %s", + adapted_qos_profile.depth, + topic_name.c_str()); + + // If the adapted_qos_profile.depth is 0, the std::move command below will result + // in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1 + // in rmw_create_subscription() but to be safe, we only attempt to discard from the + // queue if it is non-empty. if (!message_queue_.empty()) { std::unique_ptr old = std::move(message_queue_.front()); message_queue_.pop_front(); } } - // Check for messages lost if the new sequence number is not monotonically increasing. + // Check for messages lost if the new sequence number is not monotonically increasing. const size_t gid_hash = hash_gid(msg->attachment.copy_gid()); auto last_known_pub_it = last_known_published_msg_.find(gid_hash); if (last_known_pub_it != last_known_published_msg_.end()) { const int64_t seq_increment = std::abs( - msg->attachment.sequence_number() - - last_known_pub_it->second); + msg->attachment.sequence_number() - + last_known_pub_it->second); if (seq_increment > 1) { const size_t num_msg_lost = seq_increment - 1; total_messages_lost_ += num_msg_lost; @@ -632,16 +624,16 @@ void SubscriptionData::add_new_message( event_status->total_count_change = num_msg_lost; event_status->total_count = total_messages_lost_; events_mgr_->add_new_event( - ZENOH_EVENT_MESSAGE_LOST, - std::move(event_status)); + ZENOH_EVENT_MESSAGE_LOST, + std::move(event_status)); } } - // Always update the last known sequence number for the publisher. + // Always update the last known sequence number for the publisher. last_known_published_msg_[gid_hash] = msg->attachment.sequence_number(); message_queue_.emplace_back(std::move(msg)); - // Since we added new data, trigger user callback and guard condition if they are available + // Since we added new data, trigger user callback and guard condition if they are available data_callback_mgr_.trigger_callback(); if (wait_set_data_ != nullptr) { wait_set_data_->triggered = true; @@ -649,20 +641,20 @@ void SubscriptionData::add_new_message( } } -//============================================================================== + //============================================================================== void SubscriptionData::set_on_new_message_callback( rmw_event_callback_t callback, - const void * user_data) + const void *user_data) { std::lock_guard lock(mutex_); data_callback_mgr_.set_callback(user_data, std::move(callback)); } -//============================================================================== + //============================================================================== std::shared_ptr SubscriptionData::graph_cache() const { std::lock_guard lock(mutex_); return graph_cache_; } -} // namespace rmw_zenoh_cpp +} // namespace rmw_zenoh_cpp