From cadd258ce51ca6cb7c24c2df72a2a0887f2f74f9 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 27 Aug 2024 00:09:38 +0300 Subject: [PATCH] Add configurable message size threshold for handling ROS messages through SHM --- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 6 ++++- rmw_zenoh_cpp/src/detail/zenoh_config.cpp | 29 +++++++++++++++++++++ rmw_zenoh_cpp/src/detail/zenoh_config.hpp | 9 +++++++ rmw_zenoh_cpp/src/rmw_init.cpp | 20 ++++++++------ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 9 ++++--- 5 files changed, 60 insertions(+), 13 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 574e0d76..54b0bd0e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -53,7 +53,11 @@ class rmw_context_impl_s final z_owned_session_t session; #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - std::optional shm_provider; + struct rmw_shm_s { + z_owned_shm_provider_t shm_provider; + size_t msgsize_threshold; + }; + std::optional shm; #endif z_owned_subscriber_t graph_subscriber; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index c6863598..4f4bf4a5 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -45,6 +45,8 @@ static const char * router_check_attempts_envar = "ZENOH_ROUTER_CHECK_ATTEMPTS"; #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY static const char * zenoh_shm_alloc_size_envar = "ZENOH_SHM_ALLOC_SIZE"; static const size_t zenoh_shm_alloc_size_default = 1024*1024; +static const char * zenoh_shm_message_size_threshold_envar = "ZENOH_SHM_MESSAGE_SIZE_THRESHOLD"; +static const size_t zenoh_shm_message_size_threshold_default = 2*1024; #endif rmw_ret_t _get_z_config( @@ -157,5 +159,32 @@ size_t zenoh_shm_alloc_size() { return zenoh_shm_alloc_size_default; } +///============================================================================= +size_t zenoh_shm_message_size_threshold() { + const char * envar_value; + + if (NULL != rcutils_get_env(zenoh_shm_message_size_threshold_envar, &envar_value)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.", + zenoh_shm_message_size_threshold_envar); + return zenoh_shm_message_size_threshold_default; + } + + // If the environment variable contains a value, handle it accordingly. + if (envar_value[0] != '\0') { + const auto read_value = std::strtoull(envar_value, nullptr, 10); + if (read_value > 0) { + if (read_value > std::numeric_limits::max()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Envar %s: value is too large!", + zenoh_shm_message_size_threshold_envar); + } else { + return read_value; + } + } + } + + return zenoh_shm_message_size_threshold_default; +} #endif } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp index 141b262b..ae4cf74e 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp @@ -67,6 +67,15 @@ std::optional zenoh_router_check_attempts(); /// - Else value of environemnt variable is returned. /// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation size_t zenoh_shm_alloc_size(); +///============================================================================= +/// Message size threshold for implicit SHM optimization based on the environment +/// variable ZENOH_SHM_ALLOC_SIZE. +/// Messages smaller than this threshold will not be forwarded through Zenoh SHM +/// @details The behavior is as follows: +/// - If not set or <= 0, the default value of 2KB is returned. +/// - Else value of environemnt variable is returned. +/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation +size_t zenoh_shm_message_size_threshold(); #endif } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 894d6d0b..086cf9eb 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -224,6 +224,11 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) { printf(">>> SHM is enabled\n"); + rmw_context_impl_s::rmw_shm_s shm; + + // Read msg size treshold from config + shm.msgsize_threshold = rmw_zenoh_cpp::zenoh_shm_message_size_threshold(); + // Create Layout for provider's memory // Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations z_alloc_alignment_t alignment = {0}; @@ -234,22 +239,21 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { } // Create SHM provider - z_owned_shm_provider_t provider; - const auto provider_creation_result = z_posix_shm_provider_new(&provider, z_loan(layout)); - z_drop(layout); + const auto provider_creation_result = z_posix_shm_provider_new(&shm.shm_provider, z_loan(layout)); + z_drop(z_move(layout)); if (provider_creation_result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider."); return RMW_RET_ERROR; } // Upon successful provider creation, store it in the context - context->impl->shm_provider = std::make_optional(provider); + context->impl->shm = std::make_optional(shm); } auto free_shm_provider = rcpputils::make_scope_exit( [context]() { - if (context->impl->shm_provider.has_value()) { - z_drop(z_move(context->impl->shm_provider.value())); + if (context->impl->shm.has_value()) { + z_drop(z_move(context->impl->shm.value().shm_provider)); } }); #endif @@ -398,8 +402,8 @@ rmw_ret_t rmw_shutdown(rmw_context_t *context) { z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // Drop SHM provider - if (context->impl->shm_provider.has_value()) { - z_drop(z_move(context->impl->shm_provider.value())); + if (context->impl->shm.has_value()) { + z_drop(z_move(context->impl->shm.value().shm_provider)); } #endif // Close the zenoh session diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index a076cad2..18851c41 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -896,10 +896,11 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // Get memory from SHM buffer if available. - if (publisher_data->context->impl->shm_provider.has_value()) { - // printf(">>> rmw_publish(), SHM enabled\n"); + if (publisher_data->context->impl->shm.has_value() && + publisher_data->context->impl->shm.value().msgsize_threshold <= max_data_length) { + // printf(">>> rmw_publish(), Will use SHM\n"); - const auto& provider = publisher_data->context->impl->shm_provider.value(); + const auto& provider = publisher_data->context->impl->shm.value().shm_provider; // Allocate SHM bufer // We use 1-byte alignment @@ -922,7 +923,7 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, } else #endif { - // printf(">>> rmw_publish(), SHM disabled\n"); + // printf(">>> rmw_publish(), Will use RAW\n"); // Get memory from the allocator. msg_bytes = static_cast(