From 991a1a54ea6f8cd75dfa42d0081d2ae89e2c0742 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 28 Aug 2024 16:37:01 +0300 Subject: [PATCH] - refactor buffer serialization code - update default configs to have SHM enabled by default (if compiled with SHM support) --- .../DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 2 +- .../DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 | 2 +- rmw_zenoh_cpp/src/rmw_init.cpp | 14 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 270 +++++++++++------- 4 files changed, 168 insertions(+), 120 deletions(-) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 0ffdf2bb..ffb16dd0 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -229,7 +229,7 @@ }, /// Shared memory configuration shared_memory: { - enabled: false, + enabled: true, }, /// Access control configuration auth: { diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index b111bfdf..6b7adc36 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -232,7 +232,7 @@ }, /// Shared memory configuration shared_memory: { - enabled: false, + enabled: true, }, /// Access control configuration auth: { diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 086cf9eb..8c37be2f 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -175,12 +175,10 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { } #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - z_owned_string_t shm_enabled; - zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled); - auto free_shm_= rcpputils::make_scope_exit( - [&shm_enabled]() { - z_drop(z_move(shm_enabled)); - }); + z_owned_string_t shm_enabled_str; + zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled_str); + const bool shm_enabled = (strncmp(z_string_data(z_loan(shm_enabled_str)), "true", z_string_len(z_loan(shm_enabled_str))) == 0); + z_drop(z_move(shm_enabled_str)); #endif // Initialize the zenoh session. @@ -221,7 +219,7 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // Initialize the shm manager if shared_memory is enabled in the config. - if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) { + if (shm_enabled) { printf(">>> SHM is enabled\n"); rmw_context_impl_s::rmw_shm_s shm; @@ -248,6 +246,8 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { // Upon successful provider creation, store it in the context context->impl->shm = std::make_optional(shm); + } else { + printf(">>> SHM is disabled\n"); } auto free_shm_provider = rcpputils::make_scope_exit( diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 18851c41..d5ad07c6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -839,6 +839,151 @@ create_map_and_set_sequence_num(z_owned_bytes_t* out_bytes, } } // namespace +size_t serialize_into( + char *buffer, + size_t size, + rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data, + const void *ros_message) { + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(buffer, size); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr ser(fastbuffer); + + // Serialize message + if (!publisher_data->type_support->serialize_ros_message( + ros_message, ser.get_cdr(), publisher_data->type_support_impl)) { + RMW_SET_ERROR_MSG("could not serialize ROS message"); + return 0; + } + + return ser.get_serialized_data_length(); +} + +rmw_ret_t publish( + rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data, + z_moved_bytes_t* payload) { + + auto free_payload = rcpputils::make_scope_exit( + [&payload]() { z_drop(payload); }); + + z_owned_bytes_t attachment; + if (!create_map_and_set_sequence_num( + &attachment, + publisher_data->get_next_sequence_number(), + publisher_data->pub_gid)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + options.attachment = z_move(attachment); + + free_payload.cancel(); + + if (z_publisher_put(z_loan(publisher_data->pub), payload, &options) != Z_OK) { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +/// Publish as RAW message. +rmw_ret_t publish_raw( + rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data, + const void *ros_message, + size_t max_data_length) { + // printf(">>> rmw_publish(), Will use RAW\n"); + + rcutils_allocator_t *allocator = + &(publisher_data->context->options.allocator); + + // Get memory from the allocator. + char * msg_bytes = static_cast( + allocator->allocate(max_data_length, allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG(msg_bytes, "bytes for message is null", + return RMW_RET_BAD_ALLOC); + + auto free_msg_bytes = + rcpputils::make_scope_exit([&msg_bytes, allocator]() { + if (msg_bytes) { + allocator->deallocate(msg_bytes, allocator->state); + } + }); + + // Serialize message into memory + const size_t data_length = serialize_into(msg_bytes, max_data_length, publisher_data, ros_message); + // Return error upon unsuccessful serialization + if(data_length == 0) { + // serialize_into already set the error + return RMW_RET_ERROR; + } + + z_owned_bytes_t payload; + if (z_bytes_serialize_from_buf( + &payload, reinterpret_cast(msg_bytes), data_length) != Z_OK) { + RMW_SET_ERROR_MSG("unable to serialize raw buffer into Zenoh Payload"); + return RMW_RET_ERROR; + } + + return publish(publisher_data, z_move(payload)); +} + +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY +/// Publish as SHM message. +rmw_ret_t publish_shm( + rmw_zenoh_cpp::rmw_publisher_data_t* publisher_data, + const void *ros_message, + size_t max_data_length, + const z_loaned_shm_provider_t *provider) { + // printf(">>> rmw_publish(), Will use SHM\n"); + + // Allocate SHM bufer + // We use 1-byte alignment + z_buf_layout_alloc_result_t alloc; + { + z_alloc_alignment_t alignment = {0}; + z_shm_provider_alloc_gc_defrag_blocking( + &alloc, provider, + max_data_length, alignment); + } + + // Check allocation status + if (alloc.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Unexpected failure during SHM buffer allocation."); + return RMW_RET_ERROR; + } + + // Cleanup SHM buffer upon scope exit + auto always_free_shmbuf = rcpputils::make_scope_exit([&alloc]() { + z_drop(z_move(alloc.buf)); + }); + + // Serialize message into memory + char* msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); + const size_t data_length = serialize_into(msg_bytes, max_data_length, publisher_data, ros_message); + // Return error upon unsuccessful serialization + if(data_length == 0) { + // serialize_into already set the error + return RMW_RET_ERROR; + } + + z_owned_bytes_t payload; + if (z_bytes_serialize_from_shm_mut(&payload, z_move(alloc.buf)) != Z_OK) { + RMW_SET_ERROR_MSG("unable to serialize SHM buffer into Zenoh Payload"); + return RMW_RET_ERROR; + } + + return publish(publisher_data, z_move(payload)); +} +#endif + //============================================================================== /// Publish a ROS message. rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, @@ -858,137 +1003,40 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, static_cast(publisher->data); RMW_CHECK_FOR_NULL_WITH_MSG(publisher_data, "publisher_data is null", return RMW_RET_INVALID_ARGUMENT); - - rcutils_allocator_t *allocator = - &(publisher_data->context->options.allocator); - // Serialize data. size_t max_data_length = publisher_data->type_support->get_estimated_serialized_size( ros_message, publisher_data->type_support_impl); - // To store serialized message byte array. - char *msg_bytes = nullptr; - #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - std::optional shmbuf = std::nullopt; - auto always_free_shmbuf = rcpputils::make_scope_exit([&shmbuf]() { - if (shmbuf.has_value()) { - z_drop(z_move(shmbuf.value())); - } - }); -#endif - - auto free_msg_bytes = - rcpputils::make_scope_exit([&msg_bytes, allocator -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , &shmbuf -#endif - ]() { - if (msg_bytes -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - && !shmbuf.has_value() -#endif - ) { - allocator->deallocate(msg_bytes, allocator->state); - } - }); - -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - // Get memory from SHM buffer if available. if (publisher_data->context->impl->shm.has_value() && publisher_data->context->impl->shm.value().msgsize_threshold <= max_data_length) { - // printf(">>> rmw_publish(), Will use SHM\n"); - - const auto& provider = publisher_data->context->impl->shm.value().shm_provider; - - // Allocate SHM bufer - // We use 1-byte alignment - z_buf_layout_alloc_result_t alloc; - z_alloc_alignment_t alignment = {0}; - z_shm_provider_alloc_gc_defrag_blocking( - &alloc, z_loan(provider), - max_data_length, alignment); - - if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { - shmbuf = std::make_optional(alloc.buf); - msg_bytes = - reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Unexpected failure during SHM buffer allocation."); - return RMW_RET_ERROR; - } - - } else + return publish_shm( + publisher_data, + ros_message, + max_data_length, + z_loan(publisher_data->context->impl->shm.value().shm_provider)); + } else #endif - { - // printf(">>> rmw_publish(), Will use RAW\n"); + { + return publish_raw( + publisher_data, + ros_message, + max_data_length); + } +} + - // Get memory from the allocator. - msg_bytes = static_cast( - allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG(msg_bytes, "bytes for message is null", - return RMW_RET_BAD_ALLOC); - } - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!publisher_data->type_support->serialize_ros_message( - ros_message, ser.get_cdr(), publisher_data->type_support_impl)) { - RMW_SET_ERROR_MSG("could not serialize ROS message"); - return RMW_RET_ERROR; - } - const size_t data_length = ser.get_serialized_data_length(); - int64_t sequence_number = publisher_data->get_next_sequence_number(); - z_owned_bytes_t attachment; - if (!create_map_and_set_sequence_num(&attachment, sequence_number, publisher_data->pub_gid)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment = rcpputils::make_scope_exit( - [&attachment]() { z_drop(z_move(attachment)); }); - z_owned_bytes_t payload; -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - if (shmbuf.has_value()) { - if (z_bytes_serialize_from_shm_mut(&payload, z_move(shmbuf.value())) != Z_OK) { - RMW_SET_ERROR_MSG("unable to serialize SHM buffer into Zenoh Payload"); - return RMW_RET_ERROR; - } - } else -#endif - { - if (z_bytes_serialize_from_buf( - &payload, reinterpret_cast(msg_bytes), data_length) != Z_OK) { - RMW_SET_ERROR_MSG("unable to serialize raw buffer into Zenoh Payload"); - return RMW_RET_ERROR; - } - } - // we cancel free attachment because attachment will be moved into z_publisher_put_options_t - free_attachment.cancel(); - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - options.attachment = z_move(attachment); - if (z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options) != Z_OK) { - RMW_SET_ERROR_MSG("unable to publish message"); - return RMW_RET_ERROR; - } - return RMW_RET_OK; -} //============================================================================== /// Publish a loaned ROS message.