Skip to content

Commit

Permalink
C++ SHM (#49)
Browse files Browse the repository at this point in the history
C++ SHM
  • Loading branch information
yellowhatter authored Dec 27, 2024
1 parent 6b94577 commit b80d583
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 49 deletions.
10 changes: 10 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY TRUE CACHE BOOL "Compile Zenoh RMW with Shared Memory support")

# find dependencies
find_package(ament_cmake REQUIRED)

Expand Down Expand Up @@ -43,6 +45,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/rmw_subscription_data.cpp
src/detail/service_type_support.cpp
src/detail/simplified_xxhash3.cpp
src/detail/shm_context.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
Expand Down Expand Up @@ -81,6 +84,13 @@ target_compile_definitions(rmw_zenoh_cpp
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
)

if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY})
target_compile_definitions(rmw_zenoh_cpp
PUBLIC
RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
)
endif()

ament_export_targets(export_rmw_zenoh_cpp)

register_rmw_implementation(
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
44 changes: 25 additions & 19 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,21 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
"rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n");
}
}

// Initialize the shm manager if shared_memory is enabled in the config.
shm_provider_ = std::nullopt;
#ifndef _MSC_VER
if (shm_enabled == "true") {
auto layout = zenoh::MemoryLayout(
SHM_BUFFER_SIZE_MB * 1024 * 1024,
zenoh::AllocAlignment({5}));
zenoh::PosixShmProvider provider(layout, &result);
if (result != Z_OK) {
throw std::runtime_error("Unable to create shm provider.");
}
shm_provider_ = std::move(provider);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm subsystem if shared_memory is enabled in the config
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

shm_ = std::make_optional(
rmw_zenoh_cpp::ShmContext(
rmw_zenoh_cpp::zenoh_shm_alloc_size(),
rmw_zenoh_cpp::zenoh_shm_message_size_threshold()
));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
#endif

graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
graph_guard_condition_->data = &guard_condition_data_;
Expand Down Expand Up @@ -262,11 +262,13 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
return session_;
}

std::optional<zenoh::ShmProvider> & shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & shm()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return shm_provider_;
return shm_;
}
#endif

rmw_guard_condition_t * graph_guard_condition()
{
Expand Down Expand Up @@ -398,9 +400,11 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
std::string enclave_;
// An owned session.
std::shared_ptr<zenoh::Session> session_;
// An optional SHM manager that is initialized of SHM is enabled in the
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// An optional SHM context that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<zenoh::ShmProvider> shm_provider_;
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
#endif
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
Expand Down Expand Up @@ -456,11 +460,13 @@ const std::shared_ptr<zenoh::Session> rmw_context_impl_s::session() const
return data_->session();
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
///=============================================================================
std::optional<zenoh::ShmProvider> & rmw_context_impl_s::shm_provider()
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
return data_->shm_provider();
return data_->shm();
}
#endif

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ struct rmw_context_impl_s final
// Loan the Zenoh session.
const std::shared_ptr<zenoh::Session> session() const;

// Get a reference to the shm_provider.
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<zenoh::ShmProvider> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();
#endif

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down
109 changes: 96 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,11 @@ PublisherData::PublisherData(

///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -218,19 +221,59 @@ rmw_ret_t PublisherData::publish(
// To store serialized message byte array.
char * msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<zenoh::ZShmMut> shmbuf = std::nullopt;
#endif

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator]() {
if (msg_bytes) {
[&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
)
{
allocator->deallocate(msg_bytes, allocator->state);
}
});


#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
auto alloc_result = provider.alloc_gc_defrag_blocking(
max_data_length,
zenoh::AllocAlignment({0}));

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
msg_bytes = reinterpret_cast<char *>(buf.data());
shmbuf = std::make_optional(std::move(buf));
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
#endif
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand All @@ -257,11 +300,14 @@ rmw_ret_t PublisherData::publish(
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(std::move(raw_data));
auto payload =
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
shmbuf.has_value() ? zenoh::Bytes(std::move(*shmbuf)) :
#endif
zenoh::Bytes(
std::vector<uint8_t>(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
Expand All @@ -282,8 +328,11 @@ rmw_ret_t PublisherData::publish(

///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand All @@ -305,14 +354,48 @@ rmw_ret_t PublisherData::publish_serialized_message(
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

std::vector<uint8_t> raw_data(

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm.has_value() && data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
auto alloc_result = provider.alloc_gc_defrag_blocking(data_length, zenoh::AllocAlignment({0}));

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
auto msg_bytes = reinterpret_cast<char *>(buf.data());
memcpy(msg_bytes, serialized_message->buffer, data_length);
zenoh::Bytes payload(std::move(buf));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message,
source_timestamp);

pub_.put(std::move(payload), std::move(options), &result);
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
#endif
std::vector<uint8_t> raw_image(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(std::move(raw_data));
zenoh::Bytes payload(raw_image);

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);

pub_.put(std::move(payload), std::move(options), &result);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
Expand Down
15 changes: 11 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

Expand Down Expand Up @@ -56,13 +57,19 @@ class PublisherData final

// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<zenoh::ShmProvider> & shm_provider);
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Publish a serialized ROS message.
rmw_ret_t publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zenoh::ShmProvider> & shm_provider);
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
8 changes: 6 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "attachment_helpers.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

Expand Down Expand Up @@ -76,8 +77,11 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD

// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider);
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Get a copy of the keyexpr_hash of this SubscriptionData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
33 changes: 33 additions & 0 deletions rmw_zenoh_cpp/src/detail/shm_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY

#include <stdexcept>

#include "shm_context.hpp"

namespace rmw_zenoh_cpp
{
///=============================================================================
ShmContext::ShmContext(size_t alloc_size, size_t msgsize_threshold)
: // Create Layout for provider's memory
// Provider's alignment is 1 (=2^0) bytes as we are going to make only 1-byte aligned allocations
// TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment
shm_provider(zenoh::PosixShmProvider(zenoh::MemoryLayout(alloc_size, zenoh::AllocAlignment {0}))),
msgsize_threshold(msgsize_threshold)
{}
} // namespace rmw_zenoh_cpp

#endif
Loading

0 comments on commit b80d583

Please sign in to comment.