From 284bcc436dd927f84dbbffa728e048ace9cb4b62 Mon Sep 17 00:00:00 2001 From: zenoh Date: Fri, 22 Nov 2024 17:05:08 +0000 Subject: [PATCH] Wip DONE: - Fix std::string constructor call in rmw_subscription_data.cpp - Move serialization buffer pool to the context TODO: - Remove printf calls - Remove sanitizer usage - Fix formatting - Resolve FIXMEs in BufferPool impl --- rmw_zenoh_cpp/CMakeLists.txt | 4 +- ...zation_buffer_pool.hpp => buffer_pool.hpp} | 13 +- .../src/detail/rmw_context_impl_s.hpp | 4 + .../src/detail/rmw_publisher_data.cpp | 755 +++++++++--------- .../src/detail/rmw_publisher_data.hpp | 5 +- .../src/detail/rmw_subscription_data.cpp | 5 +- 6 files changed, 415 insertions(+), 371 deletions(-) rename rmw_zenoh_cpp/src/detail/{serialization_buffer_pool.hpp => buffer_pool.hpp} (90%) diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 89b5a598..5c83f13d 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -8,7 +8,9 @@ if(NOT CMAKE_CXX_STANDARD) endif() if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - add_compile_options(-Wall -Wextra -Wpedantic) + add_compile_options(-Wall -Wextra -Wpedantic -g -fsanitize=address) + link_libraries("-fsanitize=address") + link_libraries("-lasan") endif() # find dependencies diff --git a/rmw_zenoh_cpp/src/detail/serialization_buffer_pool.hpp b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp similarity index 90% rename from rmw_zenoh_cpp/src/detail/serialization_buffer_pool.hpp rename to rmw_zenoh_cpp/src/detail/buffer_pool.hpp index e4214025..9d86aa37 100644 --- a/rmw_zenoh_cpp/src/detail/serialization_buffer_pool.hpp +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef DETAIL__SERIALIZATION_BUFFER_POOL_HPP_ -#define DETAIL__SERIALIZATION_BUFFER_POOL_HPP_ +#ifndef DETAIL__BUFFER_POOL_HPP_ +#define DETAIL__BUFFER_POOL_HPP_ #include #include @@ -21,13 +21,14 @@ #include "rcutils/allocator.h" -class SerializationBufferPool +class BufferPool { public: - SerializationBufferPool() = default; + BufferPool() = default; uint8_t * allocate(rcutils_allocator_t * allocator, size_t size) { + // FIXME(fuzzypixelz): indeed, this methods leaks all allocated buffers ;) std::lock_guard guard(mutex_); if (available_buffers_.empty()) { @@ -47,7 +48,6 @@ class SerializationBufferPool assert(buffer.data); // FIXME(fuzzypixelz): handle error buffer.size = size; } - return buffer.data; } } @@ -56,6 +56,7 @@ class SerializationBufferPool deallocate(uint8_t * data) { std::lock_guard guard(mutex_); + for (size_t i = 0; i < buffers_.size(); i++) { if (buffers_.at(i).data == data) { available_buffers_.push_back(i); @@ -77,4 +78,4 @@ class SerializationBufferPool std::mutex mutex_; }; -#endif // DETAIL__SERIALIZATION_BUFFER_POOL_HPP_ +#endif // DETAIL__BUFFER_POOL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 1e46d6af..9e656a5d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -24,6 +24,7 @@ #include "graph_cache.hpp" #include "rmw_node_data.hpp" +#include "buffer_pool.hpp" #include "rmw/ret_types.h" #include "rmw/types.h" @@ -92,6 +93,9 @@ class rmw_context_impl_s final // Forward declaration class Data; + // Pool of serialization buffers. + BufferPool serialization_buffer_pool; + private: std::shared_ptr data_{nullptr}; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 08459ac1..fd2a36bd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -26,6 +26,7 @@ #include "logging_macros.hpp" #include "qos.hpp" #include "zenoh_utils.hpp" +#include "rmw_context_impl_s.hpp" #include "rcpputils/scope_exit.hpp" @@ -37,411 +38,445 @@ namespace rmw_zenoh_cpp // TODO(yuyuan): SHM, make this configurable #define SHM_BUF_OK_SIZE 2621440 -///============================================================================= -std::shared_ptr PublisherData::make( - const z_loaned_session_t * session, - const rmw_node_t * const node, - liveliness::NodeInfo node_info, - std::size_t node_id, - std::size_t publisher_id, - const std::string & topic_name, - const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile) -{ - rmw_qos_profile_t adapted_qos_profile = *qos_profile; - rmw_ret_t ret = QoS::get().best_available_qos( - node, topic_name.c_str(), &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); - if (RMW_RET_OK != ret) { - return nullptr; - } + ///============================================================================= + std::shared_ptr PublisherData::make( + const z_loaned_session_t *session, + const rmw_node_t *const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t publisher_id, + const std::string &topic_name, + const rosidl_message_type_support_t *type_support, + const rmw_qos_profile_t *qos_profile) + { + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + node, topic_name.c_str(), &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); + if (RMW_RET_OK != ret) + { + return nullptr; + } - rcutils_allocator_t * allocator = &node->context->options.allocator; - - const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); - auto callbacks = static_cast(type_support->data); - auto message_type_support = std::make_unique(callbacks); - - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto always_free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - std::size_t domain_id = node_info.domain_id_; - auto entity = liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_id), - std::to_string(publisher_id), - liveliness::EntityType::Publisher, - std::move(node_info), - liveliness::TopicInfo{ - std::move(domain_id), - topic_name, - message_type_support->get_name(), - type_hash_c_str, - adapted_qos_profile} - ); - if (entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the publisher %s.", - topic_name.c_str()); - return nullptr; - } + rcutils_allocator_t *allocator = &node->context->options.allocator; - std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_; - z_view_keyexpr_t pub_ke; - if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } + const rosidl_type_hash_t *type_hash = type_support->get_type_hash_func(type_support); + auto callbacks = static_cast(type_support->data); + auto message_type_support = std::make_unique(callbacks); - // Create a Publication Cache if durability is transient_local. - std::optional pub_cache = std::nullopt; - auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( - [&pub_cache]() { - if (pub_cache.has_value()) { - z_drop(z_move(pub_cache.value())); - } - }); - if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_publication_cache_options_t pub_cache_opts; - ze_publication_cache_options_default(&pub_cache_opts); - pub_cache_opts.history = adapted_qos_profile.depth; - pub_cache_opts.queryable_complete = true; - // Set the queryable_prefix to the session id so that querying subscribers can specify this - // session id to obtain latest data from this specific publication caches when querying over - // the same keyexpression. - // When such a prefix is added to the PublicationCache, it listens to queries with this extra - // prefix (allowing to be queried in a unique way), but still replies with the original - // publications' key expressions. - std::string queryable_prefix = entity->zid(); - z_view_keyexpr_t prefix_ke; - z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str()); - pub_cache_opts.queryable_prefix = z_loan(prefix_ke); - - ze_owned_publication_cache_t pub_cache_; - if (ze_declare_publication_cache( - session, &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) + // Convert the type hash to a string so that it can be included in + // the keyexpr. + char *type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto always_free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() + { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + std::size_t domain_id = node_info.domain_id_; + auto entity = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(publisher_id), + liveliness::EntityType::Publisher, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + topic_name, + message_type_support->get_name(), + type_hash_c_str, + adapted_qos_profile}); + if (entity == nullptr) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the publisher %s.", + topic_name.c_str()); return nullptr; } - pub_cache = pub_cache_; - } - // Set congestion_control to BLOCK if appropriate. - z_publisher_options_t opts; - z_publisher_options_default(&opts); - opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_; + z_view_keyexpr_t pub_ke; + if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) + { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } - if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - opts.reliability = Z_RELIABILITY_RELIABLE; + // Create a Publication Cache if durability is transient_local. + std::optional pub_cache = std::nullopt; + auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( + [&pub_cache]() + { + if (pub_cache.has_value()) + { + z_drop(z_move(pub_cache.value())); + } + }); + if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) + { + ze_publication_cache_options_t pub_cache_opts; + ze_publication_cache_options_default(&pub_cache_opts); + pub_cache_opts.history = adapted_qos_profile.depth; + pub_cache_opts.queryable_complete = true; + // Set the queryable_prefix to the session id so that querying subscribers can specify this + // session id to obtain latest data from this specific publication caches when querying over + // the same keyexpression. + // When such a prefix is added to the PublicationCache, it listens to queries with this extra + // prefix (allowing to be queried in a unique way), but still replies with the original + // publications' key expressions. + std::string queryable_prefix = entity->zid(); + z_view_keyexpr_t prefix_ke; + z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str()); + pub_cache_opts.queryable_prefix = z_loan(prefix_ke); + + ze_owned_publication_cache_t pub_cache_; + if (ze_declare_publication_cache( + session, &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) + { + RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); + return nullptr; + } + pub_cache = pub_cache_; + } + + // Set congestion_control to BLOCK if appropriate. + z_publisher_options_t opts; + z_publisher_options_default(&opts); + opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + + if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + { + opts.reliability = Z_RELIABILITY_RELIABLE; + + if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) + { + opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + } + } + z_owned_publisher_t pub; + // TODO(clalancette): What happens if the key name is a valid but empty string? + auto undeclare_z_publisher = rcpputils::make_scope_exit( + [&pub]() + { + z_undeclare_publisher(z_move(pub)); + }); + if (z_declare_publisher( + session, &pub, z_loan(pub_ke), &opts) != Z_OK) + { + RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); + return nullptr; + } - if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { - opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + std::string liveliness_keyexpr = entity->liveliness_keyexpr(); + z_view_keyexpr_t liveliness_ke; + z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + zc_owned_liveliness_token_t token; + auto free_token = rcpputils::make_scope_exit( + [&token]() + { + z_drop(z_move(token)); + }); + if (zc_liveliness_declare_token( + session, &token, z_loan(liveliness_ke), + NULL) != Z_OK) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the publisher."); + return nullptr; } + + free_token.cancel(); + undeclare_z_publisher_cache.cancel(); + undeclare_z_publisher.cancel(); + + return std::shared_ptr( + new PublisherData{ + node, + std::move(entity), + std::move(pub), + std::move(pub_cache), + std::move(token), + type_support->data, + std::move(message_type_support)}); } - z_owned_publisher_t pub; - // TODO(clalancette): What happens if the key name is a valid but empty string? - auto undeclare_z_publisher = rcpputils::make_scope_exit( - [&pub]() { - z_undeclare_publisher(z_move(pub)); - }); - if (z_declare_publisher( - session, &pub, z_loan(pub_ke), &opts) != Z_OK) + + ///============================================================================= + PublisherData::PublisherData( + const rmw_node_t *rmw_node, + std::shared_ptr entity, + z_owned_publisher_t pub, + std::optional pub_cache, + zc_owned_liveliness_token_t token, + const void *type_support_impl, + std::unique_ptr type_support) + : rmw_node_(rmw_node), + entity_(std::move(entity)), + pub_(std::move(pub)), + pub_cache_(std::move(pub_cache)), + token_(std::move(token)), + type_support_impl_(type_support_impl), + type_support_(std::move(type_support)), + sequence_number_(1), + is_shutdown_(false) { - RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); - return nullptr; + events_mgr_ = std::make_shared(); } - std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - zc_owned_liveliness_token_t token; - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - if (zc_liveliness_declare_token( - session, &token, z_loan(liveliness_ke), - NULL) != Z_OK) + void delete_z_bytes(void *data, void *context) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the publisher."); - return nullptr; + printf("@fp/delete_z_bytes: data=%p, context=%p\n", data, context); + BufferPool *pool = reinterpret_cast(context); + pool->deallocate(static_cast(data)); } - free_token.cancel(); - undeclare_z_publisher_cache.cancel(); - undeclare_z_publisher.cancel(); - - return std::shared_ptr( - new PublisherData{ - node, - std::move(entity), - std::move(pub), - std::move(pub_cache), - std::move(token), - type_support->data, - std::move(message_type_support) - }); -} - -///============================================================================= -PublisherData::PublisherData( - const rmw_node_t * rmw_node, - std::shared_ptr entity, - z_owned_publisher_t pub, - std::optional pub_cache, - zc_owned_liveliness_token_t token, - const void * type_support_impl, - std::unique_ptr type_support) -: rmw_node_(rmw_node), - entity_(std::move(entity)), - pub_(std::move(pub)), - pub_cache_(std::move(pub_cache)), - token_(std::move(token)), - type_support_impl_(type_support_impl), - type_support_(std::move(type_support)), - sequence_number_(1), - is_shutdown_(false) -{ - events_mgr_ = std::make_shared(); -} - -void delete_z_bytes(void * data, void * context) -{ - SerializationBufferPool * buffer = reinterpret_cast(context); - buffer->deallocate(static_cast(data)); -} - -///============================================================================= -rmw_ret_t PublisherData::publish( - const void * ros_message, - std::optional & shm_provider) -{ - std::lock_guard lock(mutex_); - if (is_shutdown_) { - RMW_SET_ERROR_MSG("Unable to publish as the publisher has been shutdown."); - return RMW_RET_ERROR; - } + ///============================================================================= + rmw_ret_t PublisherData::publish( + const void *ros_message, + std::optional &shm_provider) + { + std::lock_guard lock(mutex_); + if (is_shutdown_) + { + RMW_SET_ERROR_MSG("Unable to publish as the publisher has been shutdown."); + return RMW_RET_ERROR; + } - // Serialize data. - size_t max_data_length = type_support_->get_estimated_serialized_size( - ros_message, - type_support_impl_); - - // To store serialized message byte array. - uint8_t * msg_bytes = nullptr; - std::optional shmbuf = std::nullopt; - auto always_free_shmbuf = rcpputils::make_scope_exit( - [&shmbuf]() { - if (shmbuf.has_value()) { - z_drop(z_move(shmbuf.value())); + // Serialize data. + size_t max_data_length = type_support_->get_estimated_serialized_size( + ros_message, + type_support_impl_); + + // To store serialized message byte array. + uint8_t *msg_bytes = nullptr; + std::optional shmbuf = std::nullopt; + auto always_free_shmbuf = rcpputils::make_scope_exit( + [&shmbuf]() + { + if (shmbuf.has_value()) + { + z_drop(z_move(shmbuf.value())); + } + }); + + rcutils_allocator_t *allocator = &rmw_node_->context->options.allocator; + rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); + + // Get memory from SHM buffer if available. + if (shm_provider.has_value()) + { + RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); + + auto provider = shm_provider.value(); + z_buf_layout_alloc_result_t alloc; + // TODO(yuyuan): SHM, configure this + z_alloc_alignment_t alignment = {5}; + z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); + + if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) + { + shmbuf = std::make_optional(alloc.buf); + msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); } - }); - - rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; - - // Get memory from SHM buffer if available. - if (shm_provider.has_value()) { - RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); - - auto provider = shm_provider.value(); - z_buf_layout_alloc_result_t alloc; - // TODO(yuyuan): SHM, configure this - z_alloc_alignment_t alignment = {5}; - z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); - - if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { - shmbuf = std::make_optional(alloc.buf); - msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); - } else { - // TODO(Yadunund): Should we revert to regular allocation and not return an error? - RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); + else + { + // TODO(Yadunund): Should we revert to regular allocation and not return an error? + RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); + return RMW_RET_ERROR; + } + } + else + { + msg_bytes = context_impl->serialization_buffer_pool.allocate(allocator, max_data_length); + RMW_CHECK_FOR_NULL_WITH_MSG( + msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr ser(fastbuffer); + if (!type_support_->serialize_ros_message( + ros_message, + ser.get_cdr(), + type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not serialize ROS message"); return RMW_RET_ERROR; } - } else { - msg_bytes = this->serialization_buffer_pool.allocate(allocator, max_data_length); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - } - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); + const size_t data_length = ser.get_serialized_data_length(); + + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + z_owned_bytes_t attachment; + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); + options.attachment = z_move(attachment); + + z_owned_bytes_t payload; + if (shmbuf.has_value()) + { + z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); + } + else + { + z_bytes_from_buf( + &payload, msg_bytes, data_length, delete_z_bytes, + reinterpret_cast(&context_impl->serialization_buffer_pool)); + } - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!type_support_->serialize_ros_message( - ros_message, - ser.get_cdr(), - type_support_impl_)) - { - RMW_SET_ERROR_MSG("could not serialize ROS message"); - return RMW_RET_ERROR; - } + z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); + if (res != Z_OK) + { + if (res == Z_ESESSION_CLOSED) + { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "unable to publish message since the zenoh session is closed"); + } + else + { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } + } - const size_t data_length = ser.get_serialized_data_length(); - - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - z_owned_bytes_t attachment; - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - options.attachment = z_move(attachment); - - z_owned_bytes_t payload; - if (shmbuf.has_value()) { - z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); - } else { - z_bytes_from_buf( - &payload, msg_bytes, data_length, delete_z_bytes, - reinterpret_cast(&this->serialization_buffer_pool)); + return RMW_RET_OK; } - z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); - if (res != Z_OK) { - if (res == Z_ESESSION_CLOSED) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "unable to publish message since the zenoh session is closed"); - } else { - RMW_SET_ERROR_MSG("unable to publish message"); + ///============================================================================= + rmw_ret_t PublisherData::publish_serialized_message( + const rmw_serialized_message_t *serialized_message, + std::optional & /*shm_provider*/) + { + eprosima::fastcdr::FastBuffer buffer( + reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); + rmw_zenoh_cpp::Cdr ser(buffer); + if (!ser.get_cdr().jump(serialized_message->buffer_length)) + { + RMW_SET_ERROR_MSG("cannot correctly set serialized buffer"); return RMW_RET_ERROR; } - } - return RMW_RET_OK; -} + std::lock_guard lock(mutex_); -///============================================================================= -rmw_ret_t PublisherData::publish_serialized_message( - const rmw_serialized_message_t * serialized_message, - std::optional & /*shm_provider*/) -{ - eprosima::fastcdr::FastBuffer buffer( - reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); - rmw_zenoh_cpp::Cdr ser(buffer); - if (!ser.get_cdr().jump(serialized_message->buffer_length)) { - RMW_SET_ERROR_MSG("cannot correctly set serialized buffer"); - return RMW_RET_ERROR; - } + const size_t data_length = ser.get_serialized_data_length(); - std::lock_guard lock(mutex_); + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + z_owned_bytes_t attachment; + create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); + options.attachment = z_move(attachment); - const size_t data_length = ser.get_serialized_data_length(); + z_owned_bytes_t payload; + z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length); - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - z_owned_bytes_t attachment; - create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - - options.attachment = z_move(attachment); + z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); + if (res != Z_OK) + { + if (res == Z_ESESSION_CLOSED) + { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "unable to publish message since the zenoh session is closed"); + } + else + { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } + } - z_owned_bytes_t payload; - z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length); + return RMW_RET_OK; + } - z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); - if (res != Z_OK) { - if (res == Z_ESESSION_CLOSED) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "unable to publish message since the zenoh session is closed"); - } else { - RMW_SET_ERROR_MSG("unable to publish message"); - return RMW_RET_ERROR; - } + ///============================================================================= + std::size_t PublisherData::keyexpr_hash() const + { + std::lock_guard lock(mutex_); + return entity_->keyexpr_hash(); } - return RMW_RET_OK; -} + ///============================================================================= + liveliness::TopicInfo PublisherData::topic_info() const + { + std::lock_guard lock(mutex_); + return entity_->topic_info().value(); + } -///============================================================================= -std::size_t PublisherData::keyexpr_hash() const -{ - std::lock_guard lock(mutex_); - return entity_->keyexpr_hash(); -} + ///============================================================================= + void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const + { + std::lock_guard lock(mutex_); + entity_->copy_gid(out_gid); + } -///============================================================================= -liveliness::TopicInfo PublisherData::topic_info() const -{ - std::lock_guard lock(mutex_); - return entity_->topic_info().value(); -} + ///============================================================================= + std::shared_ptr PublisherData::events_mgr() const + { + std::lock_guard lock(mutex_); + return events_mgr_; + } -///============================================================================= -void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const -{ - std::lock_guard lock(mutex_); - entity_->copy_gid(out_gid); -} + ///============================================================================= + PublisherData::~PublisherData() + { + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing publisher /%s.", + entity_->topic_info().value().name_.c_str()); + } + } -///============================================================================= -std::shared_ptr PublisherData::events_mgr() const -{ - std::lock_guard lock(mutex_); - return events_mgr_; -} + ///============================================================================= + rmw_ret_t PublisherData::shutdown() + { + std::lock_guard lock(mutex_); + if (is_shutdown_) + { + return RMW_RET_OK; + } -///============================================================================= -PublisherData::~PublisherData() -{ - const rmw_ret_t ret = this->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Error destructing publisher /%s.", - entity_->topic_info().value().name_.c_str() - ); - } -} + // Unregister this publisher from the ROS graph. + zc_liveliness_undeclare_token(z_move(token_)); + if (pub_cache_.has_value()) + { + z_drop(z_move(pub_cache_.value())); + } + z_undeclare_publisher(z_move(pub_)); -///============================================================================= -rmw_ret_t PublisherData::shutdown() -{ - std::lock_guard lock(mutex_); - if (is_shutdown_) { + is_shutdown_ = true; return RMW_RET_OK; } - // Unregister this publisher from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); - if (pub_cache_.has_value()) { - z_drop(z_move(pub_cache_.value())); + ///============================================================================= + bool PublisherData::is_shutdown() const + { + std::lock_guard lock(mutex_); + return is_shutdown_; } - z_undeclare_publisher(z_move(pub_)); - - is_shutdown_ = true; - return RMW_RET_OK; -} - -///============================================================================= -bool PublisherData::is_shutdown() const -{ - std::lock_guard lock(mutex_); - return is_shutdown_; -} -} // namespace rmw_zenoh_cpp +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 1726b244..53d1750f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -27,7 +27,7 @@ #include "event.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" -#include "serialization_buffer_pool.hpp" +#include "buffer_pool.hpp" #include "rmw/ret_types.h" @@ -109,8 +109,7 @@ class PublisherData final size_t sequence_number_; // Shutdown flag. bool is_shutdown_; - // Pool of serialization buffers. - SerializationBufferPool serialization_buffer_pool; + }; using PublisherDataPtr = std::shared_ptr; using PublisherDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 614288f6..50a5f3bd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -65,12 +65,15 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data) z_owned_slice_t slice; z_bytes_to_slice(payload, &slice); + std::string topic_name(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); + printf("topic_name=%s\n", topic_name.c_str()); + sub_data->add_new_message( std::make_unique( slice, z_timestamp_ntp64_time(z_sample_timestamp(sample)), std::move(attachment)), - z_string_data(z_loan(keystr))); + topic_name); } } // namespace