From 94a014a899bda3b09a7bb81f0222399209648c8d Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Mon, 26 Aug 2024 22:01:07 +0300 Subject: [PATCH] - refine SHM code, fix some bugs - refine build system --- rmw_zenoh_cpp/src/rmw_init.cpp | 17 ++++++++--- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 51 ++++++++++++++++++--------------- zenoh_c_vendor/CMakeLists.txt | 4 ++- 3 files changed, 44 insertions(+), 28 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 688d5b8f..894d6d0b 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -224,16 +224,25 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) { printf(">>> SHM is enabled\n"); - // TODO(yuyuan): determine the default alignment - z_alloc_alignment_t alignment = {5}; + // Create Layout for provider's memory + // Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations + z_alloc_alignment_t alignment = {0}; z_owned_memory_layout_t layout; - z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment); + if(z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment) != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a Layout for SHM provider."); + return RMW_RET_ERROR; + } + // Create SHM provider z_owned_shm_provider_t provider; - if (z_posix_shm_provider_new(&provider, z_loan(layout))) { + const auto provider_creation_result = z_posix_shm_provider_new(&provider, z_loan(layout)); + z_drop(layout); + if (provider_creation_result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider."); return RMW_RET_ERROR; } + + // Upon successful provider creation, store it in the context context->impl->shm_provider = std::make_optional(provider); } diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 0bc7f4c3..a076cad2 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -160,11 +160,6 @@ find_service_type_support(const rosidl_service_type_support_t *type_supports) { extern "C" { -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY -// TODO(yuyuan): SHM, make this configurable -#define SHM_BUF_OK_SIZE 2621440 -#endif - //============================================================================== /// Get the name of the rmw implementation being used const char *rmw_get_implementation_identifier(void) { @@ -904,13 +899,15 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, if (publisher_data->context->impl->shm_provider.has_value()) { // printf(">>> rmw_publish(), SHM enabled\n"); - auto provider = publisher_data->context->impl->shm_provider.value(); + const auto& provider = publisher_data->context->impl->shm_provider.value(); + + // Allocate SHM bufer + // We use 1-byte alignment z_buf_layout_alloc_result_t alloc; - // TODO(yuyuan): SHM, configure this - z_alloc_alignment_t alignment = {5}; + z_alloc_alignment_t alignment = {0}; z_shm_provider_alloc_gc_defrag_blocking( &alloc, z_loan(provider), - SHM_BUF_OK_SIZE, alignment); + max_data_length, alignment); if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { shmbuf = std::make_optional(alloc.buf); @@ -957,30 +954,38 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, auto free_attachment = rcpputils::make_scope_exit( [&attachment]() { z_drop(z_move(attachment)); }); - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - options.attachment = z_move(attachment); - z_owned_bytes_t payload; - #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY if (shmbuf.has_value()) { - z_bytes_serialize_from_shm_mut(&payload, z_move(shmbuf.value())); - z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options); + if (z_bytes_serialize_from_shm_mut(&payload, z_move(shmbuf.value())) != Z_OK) { + RMW_SET_ERROR_MSG("unable to serialize SHM buffer into Zenoh Payload"); + return RMW_RET_ERROR; + } } else #endif { - z_bytes_serialize_from_buf( - &payload, reinterpret_cast(msg_bytes), data_length); - if (z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options)) { - RMW_SET_ERROR_MSG("unable to publish message"); + if (z_bytes_serialize_from_buf( + &payload, reinterpret_cast(msg_bytes), data_length) != Z_OK) { + RMW_SET_ERROR_MSG("unable to serialize raw buffer into Zenoh Payload"); return RMW_RET_ERROR; } } + // we cancel free attachment because attachment will be moved into z_publisher_put_options_t + free_attachment.cancel(); + + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + options.attachment = z_move(attachment); + + if (z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options) != Z_OK) { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } + return RMW_RET_OK; } diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 12da2b2b..db575f69 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -29,7 +29,9 @@ ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c VCS_VERSION dev/1.0.0 CMAKE_ARGS - -DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS} -DZENOHC_BUILD_WITH_SHARED_MEMORY=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY} -DZENOHC_BUILD_WITH_UNSTABLE_API=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY} + -DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS} + -DZENOHC_BUILD_WITH_SHARED_MEMORY=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY} + -DZENOHC_BUILD_WITH_UNSTABLE_API=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY} ) ament_package()