Skip to content

Commit

Permalink
Add configurable message size threshold for handling ROS messages thr…
Browse files Browse the repository at this point in the history
…ough SHM
  • Loading branch information
yellowhatter committed Aug 26, 2024
1 parent e6da873 commit cadd258
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 13 deletions.
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ class rmw_context_impl_s final
z_owned_session_t session;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_provider_t> shm_provider;
struct rmw_shm_s {
z_owned_shm_provider_t shm_provider;
size_t msgsize_threshold;
};
std::optional<rmw_shm_s> shm;
#endif

z_owned_subscriber_t graph_subscriber;
Expand Down
29 changes: 29 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ static const char * router_check_attempts_envar = "ZENOH_ROUTER_CHECK_ATTEMPTS";
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
static const char * zenoh_shm_alloc_size_envar = "ZENOH_SHM_ALLOC_SIZE";
static const size_t zenoh_shm_alloc_size_default = 1024*1024;
static const char * zenoh_shm_message_size_threshold_envar = "ZENOH_SHM_MESSAGE_SIZE_THRESHOLD";
static const size_t zenoh_shm_message_size_threshold_default = 2*1024;
#endif

rmw_ret_t _get_z_config(
Expand Down Expand Up @@ -157,5 +159,32 @@ size_t zenoh_shm_alloc_size() {

return zenoh_shm_alloc_size_default;
}
///=============================================================================
size_t zenoh_shm_message_size_threshold() {
const char * envar_value;

if (NULL != rcutils_get_env(zenoh_shm_message_size_threshold_envar, &envar_value)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.",
zenoh_shm_message_size_threshold_envar);
return zenoh_shm_message_size_threshold_default;
}

// If the environment variable contains a value, handle it accordingly.
if (envar_value[0] != '\0') {
const auto read_value = std::strtoull(envar_value, nullptr, 10);
if (read_value > 0) {
if (read_value > std::numeric_limits<size_t>::max()) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Envar %s: value is too large!",
zenoh_shm_message_size_threshold_envar);
} else {
return read_value;
}
}
}

return zenoh_shm_message_size_threshold_default;
}
#endif
} // namespace rmw_zenoh_cpp
9 changes: 9 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ std::optional<uint64_t> zenoh_router_check_attempts();
/// - Else value of environemnt variable is returned.
/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation
size_t zenoh_shm_alloc_size();
///=============================================================================
/// Message size threshold for implicit SHM optimization based on the environment
/// variable ZENOH_SHM_ALLOC_SIZE.
/// Messages smaller than this threshold will not be forwarded through Zenoh SHM
/// @details The behavior is as follows:
/// - If not set or <= 0, the default value of 2KB is returned.
/// - Else value of environemnt variable is returned.
/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation
size_t zenoh_shm_message_size_threshold();
#endif
} // namespace rmw_zenoh_cpp

Expand Down
20 changes: 12 additions & 8 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {
if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
printf(">>> SHM is enabled\n");

rmw_context_impl_s::rmw_shm_s shm;

// Read msg size treshold from config
shm.msgsize_threshold = rmw_zenoh_cpp::zenoh_shm_message_size_threshold();

// Create Layout for provider's memory
// Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations
z_alloc_alignment_t alignment = {0};
Expand All @@ -234,22 +239,21 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {
}

// Create SHM provider
z_owned_shm_provider_t provider;
const auto provider_creation_result = z_posix_shm_provider_new(&provider, z_loan(layout));
z_drop(layout);
const auto provider_creation_result = z_posix_shm_provider_new(&shm.shm_provider, z_loan(layout));
z_drop(z_move(layout));
if (provider_creation_result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider.");
return RMW_RET_ERROR;
}

// Upon successful provider creation, store it in the context
context->impl->shm_provider = std::make_optional(provider);
context->impl->shm = std::make_optional(shm);
}

auto free_shm_provider = rcpputils::make_scope_exit(
[context]() {
if (context->impl->shm_provider.has_value()) {
z_drop(z_move(context->impl->shm_provider.value()));
if (context->impl->shm.has_value()) {
z_drop(z_move(context->impl->shm.value().shm_provider));
}
});
#endif
Expand Down Expand Up @@ -398,8 +402,8 @@ rmw_ret_t rmw_shutdown(rmw_context_t *context) {
z_undeclare_subscriber(z_move(context->impl->graph_subscriber));
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Drop SHM provider
if (context->impl->shm_provider.has_value()) {
z_drop(z_move(context->impl->shm_provider.value()));
if (context->impl->shm.has_value()) {
z_drop(z_move(context->impl->shm.value().shm_provider));
}
#endif
// Close the zenoh session
Expand Down
9 changes: 5 additions & 4 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,10 +896,11 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message,

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (publisher_data->context->impl->shm_provider.has_value()) {
// printf(">>> rmw_publish(), SHM enabled\n");
if (publisher_data->context->impl->shm.has_value() &&
publisher_data->context->impl->shm.value().msgsize_threshold <= max_data_length) {
// printf(">>> rmw_publish(), Will use SHM\n");

const auto& provider = publisher_data->context->impl->shm_provider.value();
const auto& provider = publisher_data->context->impl->shm.value().shm_provider;

// Allocate SHM bufer
// We use 1-byte alignment
Expand All @@ -922,7 +923,7 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message,
} else
#endif
{
// printf(">>> rmw_publish(), SHM disabled\n");
// printf(">>> rmw_publish(), Will use RAW\n");

// Get memory from the allocator.
msg_bytes = static_cast<char *>(
Expand Down

0 comments on commit cadd258

Please sign in to comment.