diff --git a/docs/design.md b/docs/design.md index 43ecdc7a..73d547f8 100644 --- a/docs/design.md +++ b/docs/design.md @@ -434,3 +434,20 @@ Thus, there is no direct implementation of actions in `rmw_zenoh_cpp`. ## Security TBD + +## Environment variables + +### `RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES` + +The RMW recycles serialization buffers on transmission using a buffer pool with bounded memory +usage. +These buffers are returned to the pool - without being deallocated - once they cross the +network boundary in host-to-host communication, or after transmission in inter-process +communication, or upon being consumed by subscriptions in intra-process communication, etc. + +When the total size of the allocated buffers within the pool exceeds +`RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES`, serialization buffers are allocated using the system +allocator and moved to Zenoh; no recycling is performed in this case to prevent the buffer pool from +growing uncontrollably. + +The default value of `RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES` is 8 MiB; this value was chosen since it is roughly the size of the cache in a modern CPU. diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 3e3d0b72..96823d95 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -307,6 +307,12 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this return graph_cache_; } + std::shared_ptr serialization_buffer_pool() + { + std::lock_guard lock(mutex_); + return serialization_buffer_pool_; + } + bool create_node_data( const rmw_node_t * const node, const std::string & ns, @@ -412,6 +418,8 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this std::optional shm_provider_; // Graph cache. std::shared_ptr graph_cache_; + // Pool of serialization buffers. + std::shared_ptr serialization_buffer_pool_; // ROS graph liveliness subscriber. // The graph_subscriber *must* exist in order for anything in this Data class, // and hence rmw_zenoh_cpp, to work. @@ -507,6 +515,12 @@ std::shared_ptr rmw_context_impl_s::graph_cache() return data_->graph_cache(); } +///============================================================================= +std::shared_ptr rmw_context_impl_s::serialization_buffer_pool() +{ + return data_->serialization_buffer_pool(); +} + ///============================================================================= bool rmw_context_impl_s::create_node_data( const rmw_node_t * const node, diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index a2fdaf5e..d3b0742a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -24,6 +24,7 @@ #include "graph_cache.hpp" #include "rmw_node_data.hpp" +#include "zenoh_utils.hpp" #include "rmw/ret_types.h" #include "rmw/types.h" @@ -74,6 +75,9 @@ struct rmw_context_impl_s final /// Return a shared_ptr to the GraphCache stored in this context. std::shared_ptr graph_cache(); + /// Return a shared_ptr to the Serialization buffer pool stored in this context. + std::shared_ptr serialization_buffer_pool(); + /// Create a NodeData and store it within this context. The NodeData can be /// retrieved using get_node(). /// Returns false if parameters are invalid. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index f960b879..4e3ec067 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -216,24 +217,33 @@ rmw_ret_t PublisherData::publish( type_support_impl_); // To store serialized message byte array. - char * msg_bytes = nullptr; - - rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + uint8_t * msg_bytes = nullptr; + + rmw_context_impl_s *context_impl = static_cast(rmw_node_->data); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + // Try to get memory from the serialization buffer pool. + BufferPool::Buffer serialization_buffer = + context_impl->serialization_buffer_pool()->allocate(max_data_length); + if (serialization_buffer.data == nullptr) { + void * data = allocator.allocate(max_data_length, allocator.state); + RMW_CHECK_FOR_NULL_WITH_MSG( + data, "failed to allocate serialization buffer", return RMW_RET_BAD_ALLOC); + msg_bytes = static_cast(data); + } else { + msg_bytes = serialization_buffer.data; + } auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator]() { - if (msg_bytes) { - allocator->deallocate(msg_bytes, allocator->state); + [&msg_bytes, &allocator, &serialization_buffer]() { + if (serialization_buffer.data == nullptr) { + allocator.deallocate(msg_bytes, allocator.state); } }); - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); // Object that serializes the data rmw_zenoh_cpp::Cdr ser(fastbuffer); @@ -258,10 +268,19 @@ rmw_ret_t PublisherData::publish( sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); // TODO(ahcorde): shmbuf - std::vector raw_data( - reinterpret_cast(msg_bytes), - reinterpret_cast(msg_bytes) + data_length); - zenoh::Bytes payload(std::move(raw_data)); + zenoh::Bytes payload; + if (serialization_buffer.data == nullptr) { + std::vector raw_data( + reinterpret_cast(msg_bytes), + reinterpret_cast(msg_bytes) + data_length); + payload = zenoh::Bytes(std::move(raw_data)); + } else { + auto deleter = [buffer_pool = context_impl->serialization_buffer_pool(), + buffer = serialization_buffer](uint8_t *){ + buffer_pool->deallocate(buffer); + }; + payload = zenoh::Bytes(msg_bytes, data_length, deleter); + } TRACETOOLS_TRACEPOINT( rmw_publish, static_cast(rmw_publisher_), ros_message, source_timestamp); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 5770718b..4c557174 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -128,4 +128,80 @@ bool Payload::empty() const return std::holds_alternative(bytes_); } +///============================================================================= +BufferPool::BufferPool() +: buffers_(), mutex_() +{ + const char * env_value; + const char * error_str = rcutils_get_env("RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES", &env_value); + if (error_str != nullptr) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Unable to read maximum buffer pool size, falling back to default."); + max_size_ = DEFAULT_MAX_SIZE; + } else if (strcmp(env_value, "") == 0) { + max_size_ = DEFAULT_MAX_SIZE; + } else { + max_size_ = std::atoll(env_value); + } + size_ = 0; +} + +///============================================================================= +BufferPool::~BufferPool() +{ + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + for (Buffer & buffer : buffers_) { + allocator.deallocate(buffer.data, allocator.state); + } +} + +///============================================================================= +BufferPool::Buffer BufferPool::allocate(size_t size) +{ + std::lock_guard guard(mutex_); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + if (buffers_.empty()) { + if (size_ + size > max_size_) { + return {}; + } else { + size_ += size; + } + uint8_t * data = static_cast(allocator.allocate(size, allocator.state)); + if (data == nullptr) { + return {}; + } else { + return Buffer {data, size}; + } + } else { + Buffer buffer = buffers_.back(); + buffers_.pop_back(); + if (buffer.size < size) { + size_t size_diff = size - buffer.size; + if (size_ + size_diff > max_size_) { + return {}; + } + uint8_t * data = static_cast(allocator.reallocate( + buffer.data, size, allocator.state)); + if (data == nullptr) { + return {}; + } + size_ += size_diff; + buffer.data = data; + buffer.size = size; + } + return buffer; + } +} + +///============================================================================= +void BufferPool::deallocate(BufferPool::Buffer buffer) +{ + std::lock_guard guard(mutex_); + buffers_.push_back(buffer); +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index dd7cff72..31bb8a94 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -19,13 +19,19 @@ #include #include +#include +#include #include +#include #include #include #include #include +#include "rcutils/allocator.h" +#include "rcutils/env.h" #include "rmw/types.h" +#include "logging_macros.hpp" namespace rmw_zenoh_cpp { @@ -92,6 +98,36 @@ class Payload // and `zenoh::Slice` plus a `zenoh::Bytes` otherwise. std::variant bytes_; }; + +///============================================================================= +class BufferPool +{ +public: + struct Buffer + { + uint8_t * data; + size_t size; + }; + + BufferPool(); + + ~BufferPool(); + + Buffer allocate(size_t size); + + void deallocate(Buffer buffer); + +private: + std::vector buffers_; + std::mutex mutex_; + size_t max_size_; + size_t size_; + // NOTE(fuzzypixelz): Pooled buffers are recycled with the expectation that they would reside in + // cache, thus this this value should be comparable to the size of a modern CPU cache. The default + // value (8 MiB) is relatively conservative as CPU cache sizes range from a few MiB to a few + // hundred MiB. + const size_t DEFAULT_MAX_SIZE = 8 * 1024 * 1024; +}; } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_UTILS_HPP_