diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..154b5f98 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +build +install +.cache +log +.vscode diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 14390c8c..0a29d9cb 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -7,6 +7,8 @@ if(NOT CMAKE_CXX_STANDARD) set(CMAKE_CXX_STANDARD_REQUIRED ON) endif() +set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY OFF CACHE BOOL "Compile Zenoh RMW with Shared Memory support") + if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() @@ -74,6 +76,14 @@ target_compile_definitions(rmw_zenoh_cpp RMW_VERSION_PATCH=${rmw_VERSION_PATCH} ) +if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY}) + target_compile_definitions(rmw_zenoh_cpp + PRIVATE + RMW_ZENOH_BUILD_WITH_SHARED_MEMORY + ) +endif() + + ament_export_targets(export_rmw_zenoh_cpp) register_rmw_implementation( diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 48acfe9d..6a066c04 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -236,7 +236,7 @@ }, /// Shared memory configuration shared_memory: { - enabled: false, + enabled: true, }, /// Access control configuration auth: { diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index be42dc5c..7fe7f302 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -239,7 +239,7 @@ }, /// Shared memory configuration shared_memory: { - enabled: false, + enabled: true, }, /// Access control configuration auth: { diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index 206e2a92..8a71b1f3 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -42,6 +42,12 @@ static const std::unordered_map zenoh_router_check_attempts() // If unset, check indefinitely. return default_value; } + + +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY +///============================================================================= +size_t zenoh_shm_alloc_size() +{ + const char * envar_value; + + if (NULL != rcutils_get_env(zenoh_shm_alloc_size_envar, &envar_value)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.", + zenoh_shm_alloc_size_envar); + return zenoh_shm_alloc_size_default; + } + + // If the environment variable contains a value, handle it accordingly. + if (envar_value[0] != '\0') { + const auto read_value = std::strtoull(envar_value, nullptr, 10); + if (read_value > 0) { + if (read_value > std::numeric_limits::max()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Envar %s: value is too large!", + zenoh_shm_alloc_size_envar); + } else { + return read_value; + } + } + } + + return zenoh_shm_alloc_size_default; +} +///============================================================================= +size_t zenoh_shm_message_size_threshold() +{ + const char * envar_value; + + if (NULL != rcutils_get_env(zenoh_shm_message_size_threshold_envar, &envar_value)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.", + zenoh_shm_message_size_threshold_envar); + return zenoh_shm_message_size_threshold_default; + } + + // If the environment variable contains a value, handle it accordingly. + if (envar_value[0] != '\0') { + const auto read_value = std::strtoull(envar_value, nullptr, 10); + if (read_value > 0) { + if (read_value > std::numeric_limits::max()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Envar %s: value is too large!", + zenoh_shm_message_size_threshold_envar); + } else { + return read_value; + } + } + } + + return zenoh_shm_message_size_threshold_default; +} +#endif } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp index 357c7e89..ae4cf74e 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp @@ -57,6 +57,26 @@ rmw_ret_t get_z_config(const ConfigurableEntity & entity, z_owned_config_t * con /// @return The number of times to try connecting to a zenoh router and /// std::nullopt if establishing a connection to a router is not required. std::optional zenoh_router_check_attempts(); + +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY +///============================================================================= +/// Get the amount of shared memory to be pre-allocated for Zenoh SHM operation +/// based on the environment variable ZENOH_SHM_ALLOC_SIZE. +/// @details The behavior is as follows: +/// - If not set or <= 0, the default value of 1MB is returned. +/// - Else value of environemnt variable is returned. +/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation +size_t zenoh_shm_alloc_size(); +///============================================================================= +/// Message size threshold for implicit SHM optimization based on the environment +/// variable ZENOH_SHM_ALLOC_SIZE. +/// Messages smaller than this threshold will not be forwarded through Zenoh SHM +/// @details The behavior is as follows: +/// - If not set or <= 0, the default value of 2KB is returned. +/// - Else value of environemnt variable is returned. +/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation +size_t zenoh_shm_message_size_threshold(); +#endif } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_CONFIG_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 90580f76..5e02f361 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -105,12 +105,200 @@ const rosidl_service_type_support_t * find_service_type_support( return type_support; } -} // namespace -extern "C" +bool +create_map_and_set_sequence_num( + z_owned_bytes_t * out_bytes, + int64_t sequence_number, + uint8_t gid[RMW_GID_STORAGE_SIZE]) +{ + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + int64_t source_timestamp = now_ns.count(); + + rmw_zenoh_cpp::attachement_data_t data(sequence_number, source_timestamp, + gid); + if (data.serialize_to_zbytes(out_bytes)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Failed to serialize the attachment"); + return false; + } + + return true; +} + +rmw_ret_t publish( + rmw_zenoh_cpp::rmw_publisher_data_t * publisher_data, + z_moved_bytes_t * payload) +{ + auto free_payload = rcpputils::make_scope_exit( + [&payload]() {z_drop(payload);}); + + z_owned_bytes_t attachment; + if (!create_map_and_set_sequence_num( + &attachment, + publisher_data->get_next_sequence_number(), + publisher_data->pub_gid)) + { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + options.attachment = z_move(attachment); + + free_payload.cancel(); + + if (z_publisher_put(z_loan(publisher_data->pub), payload, &options) != Z_OK) { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +/// Publish as RAW message. +template +rmw_ret_t publish_raw( + rmw_zenoh_cpp::rmw_publisher_data_t * publisher_data, + size_t max_data_length, + SerializerArgs... serializer_args) +{ + // printf(">>> rmw_publish(), Will use RAW\n"); + + rcutils_allocator_t * allocator = + &(publisher_data->context->options.allocator); + + // Get memory from the allocator. + char * msg_bytes = static_cast( + allocator->allocate(max_data_length, allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + msg_bytes, "bytes for message is null", + return RMW_RET_BAD_ALLOC); + + auto free_msg_bytes = + rcpputils::make_scope_exit( + [&msg_bytes, allocator]() { + if (msg_bytes) { + allocator->deallocate(msg_bytes, allocator->state); + } + }); + + // Serialize message into memory + const size_t data_length = Tserializer::serialize_into( + msg_bytes, + max_data_length, + publisher_data, + serializer_args ...); + // Return error upon unsuccessful serialization + if (data_length == 0) { + // serialize_into already set the error + return RMW_RET_ERROR; + } + + z_owned_bytes_t payload; + if (z_bytes_serialize_from_buf( + &payload, reinterpret_cast(msg_bytes), data_length) != Z_OK) + { + RMW_SET_ERROR_MSG("unable to serialize raw buffer into Zenoh Payload"); + return RMW_RET_ERROR; + } + + return publish(publisher_data, z_move(payload)); +} + +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY +/// Publish as SHM message. +template +rmw_ret_t publish_shm( + rmw_zenoh_cpp::rmw_publisher_data_t * publisher_data, + size_t max_data_length, + const z_loaned_shm_provider_t * provider, + SerializerArgs... serializer_args) +{ + // printf(">>> rmw_publish(), Will use SHM\n"); + + // Allocate SHM bufer + // We use 1-byte alignment + z_buf_layout_alloc_result_t alloc; + { + z_alloc_alignment_t alignment = {0}; + z_shm_provider_alloc_gc_defrag_blocking( + &alloc, provider, + max_data_length, alignment); + } + + // Check allocation status + if (alloc.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Unexpected failure during SHM buffer allocation."); + return RMW_RET_ERROR; + } + + // Cleanup SHM buffer upon scope exit + auto always_free_shmbuf = rcpputils::make_scope_exit( + [&alloc]() { + z_drop(z_move(alloc.buf)); + }); + + // Serialize message into memory + char * msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); + const size_t data_length = Tserializer::serialize_into( + msg_bytes, + max_data_length, + publisher_data, + serializer_args ...); + + // Return error upon unsuccessful serialization + if (data_length == 0) { + // serialize_into already set the error + return RMW_RET_ERROR; + } + + // construct z_owned_bytes_t from SHM buffer + z_owned_bytes_t payload; + if (z_bytes_serialize_from_shm_mut(&payload, z_move(alloc.buf)) != Z_OK) { + RMW_SET_ERROR_MSG("unable to serialize SHM buffer into Zenoh Payload"); + return RMW_RET_ERROR; + } + + // publish data + return publish(publisher_data, z_move(payload)); +} +#endif + +/// Publish using raw or SHM(if applicable) buffer +template +rmw_ret_t publish_with_method_selection( + rmw_zenoh_cpp::rmw_publisher_data_t * publisher_data, + size_t max_data_length, + SerializerArgs... serializer_args) { -// TODO(yuyuan): SHM, make this configurable -#define SHM_BUF_OK_SIZE 2621440 +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY + if (publisher_data->context->impl->shm.has_value() && + publisher_data->context->impl->shm.value().msgsize_threshold <= max_data_length) + { + return publish_shm( + publisher_data, + max_data_length, + z_loan(publisher_data->context->impl->shm.value().shm_provider), + serializer_args ...); + } +#endif + return publish_raw( + publisher_data, + max_data_length, + serializer_args ...); +} + +} // namespace + +extern "C" { //============================================================================== /// Get the name of the rmw implementation being used @@ -801,29 +989,52 @@ rmw_return_loaned_message_from_publisher( return RMW_RET_UNSUPPORTED; } -namespace +struct RosMsgSerializer { -bool -create_map_and_set_sequence_num( - z_owned_bytes_t * out_bytes, - int64_t sequence_number, - uint8_t gid[RMW_GID_STORAGE_SIZE]) -{ - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - int64_t source_timestamp = now_ns.count(); + static size_t serialize_into( + char * buffer, + size_t size, + rmw_zenoh_cpp::rmw_publisher_data_t * publisher_data, + const void * ros_message) + { + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(buffer, size); - rmw_zenoh_cpp::attachement_data_t data(sequence_number, source_timestamp, gid); - if (data.serialize_to_zbytes(out_bytes)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Failed to serialize the attachment"); - return false; + // Object that serializes the data + rmw_zenoh_cpp::Cdr ser(fastbuffer); + + // Serialize message + if (!publisher_data->type_support->serialize_ros_message( + ros_message, ser.get_cdr(), publisher_data->type_support_impl)) + { + RMW_SET_ERROR_MSG("could not serialize ROS message"); + return 0; + } + + return ser.get_serialized_data_length(); } +}; - return true; -} -} // namespace + +struct RosSerializedMsgSerializer +{ + static size_t serialize_into( + char * _buffer, + size_t _size, + rmw_zenoh_cpp::rmw_publisher_data_t * publisher_data, + const rmw_serialized_message_t * serialized_message) + { + eprosima::fastcdr::FastBuffer buffer( + reinterpret_cast(serialized_message->buffer), + serialized_message->buffer_length); + rmw_zenoh_cpp::Cdr ser(buffer); + if (!ser.get_cdr().jump(serialized_message->buffer_length)) { + RMW_SET_ERROR_MSG("cannot correctly set serialized buffer"); + return RMW_RET_ERROR; + } + return ser.get_serialized_data_length(); + } +}; //============================================================================== /// Publish a ROS message. @@ -844,10 +1055,15 @@ rmw_publish( ros_message, "ros message handle is null", return RMW_RET_INVALID_ARGUMENT); - auto publisher_data = static_cast(publisher->data); + auto publisher_data = + static_cast(publisher->data); RMW_CHECK_FOR_NULL_WITH_MSG( publisher_data, "publisher_data is null", return RMW_RET_INVALID_ARGUMENT); + // estimate serialized data size + size_t max_data_length = + publisher_data->type_support->get_estimated_serialized_size( + ros_message, publisher_data->type_support_impl); rcutils_allocator_t * allocator = &(publisher_data->context->options.allocator); @@ -1038,45 +1254,10 @@ rmw_publish_serialized_message( publisher_data, "publisher data pointer is null", return RMW_RET_ERROR); - eprosima::fastcdr::FastBuffer buffer( - reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); - rmw_zenoh_cpp::Cdr ser(buffer); - if (!ser.get_cdr().jump(serialized_message->buffer_length)) { - RMW_SET_ERROR_MSG("cannot correctly set serialized buffer"); - return RMW_RET_ERROR; - } - - uint64_t sequence_number = publisher_data->get_next_sequence_number(); - - z_owned_bytes_t attachment; - if (!create_map_and_set_sequence_num(&attachment, sequence_number, publisher_data->pub_gid)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment = - rcpputils::make_scope_exit( - [&attachment]() { - z_drop(z_move(attachment)); - }); - - const size_t data_length = ser.get_serialized_data_length(); - - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - options.attachment = z_move(attachment); - - z_owned_bytes_t payload; - z_bytes_serialize_from_buf(&payload, serialized_message->buffer, data_length); - - if (z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options) != Z_OK) { - RMW_SET_ERROR_MSG("unable to publish message"); - return RMW_RET_ERROR; - } - - return RMW_RET_OK; + return publish_with_method_selection( + publisher_data, + serialized_message->buffer_length, + serialized_message); } //============================================================================== diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 5fdbd77c..a41e99f0 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -15,7 +15,9 @@ find_package(ament_cmake_vendor_package REQUIRED) # Note: We separate the two args needed for cargo with "$" and not ";" as the # latter is a list separater in cmake and hence the string will be split into two # when expanded. -set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_tls") +set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/transport_compression zenoh/transport_tcp zenoh/transport_tls") + +set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY OFF CACHE BOOL "Compile Zenoh RMW with Shared Memory support") # Set VCS_VERSION to include latest changes from zenoh-c to benefit from : # - https://github.com/eclipse-zenoh/zenoh-c/pull/340 (fix build issue) @@ -29,7 +31,8 @@ ament_vendor(zenoh_c_vendor VCS_VERSION 1.0.0.9 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" - "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" + "-DZENOHC_BUILD_WITH_SHARED_MEMORY=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY}" + "-DZENOHC_BUILD_WITH_UNSTABLE_API=true" "-DZENOHC_CUSTOM_TARGET=${ZENOHC_CUSTOM_TARGET}" )