Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ SHM #49

Merged
merged 9 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY TRUE CACHE BOOL "Compile Zenoh RMW with Shared Memory support")

# find dependencies
find_package(ament_cmake REQUIRED)

Expand Down Expand Up @@ -43,6 +45,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/rmw_subscription_data.cpp
src/detail/service_type_support.cpp
src/detail/simplified_xxhash3.cpp
src/detail/shm_context.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
Expand Down Expand Up @@ -81,6 +84,13 @@ target_compile_definitions(rmw_zenoh_cpp
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
)

if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY})
target_compile_definitions(rmw_zenoh_cpp
PUBLIC
RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
)
endif()

ament_export_targets(export_rmw_zenoh_cpp)

register_rmw_implementation(
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
44 changes: 25 additions & 19 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,21 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
"rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n");
}
}

// Initialize the shm manager if shared_memory is enabled in the config.
shm_provider_ = std::nullopt;
#ifndef _MSC_VER
yellowhatter marked this conversation as resolved.
Show resolved Hide resolved
if (shm_enabled == "true") {
auto layout = zenoh::MemoryLayout(
SHM_BUFFER_SIZE_MB * 1024 * 1024,
zenoh::AllocAlignment({5}));
zenoh::PosixShmProvider provider(layout, &result);
if (result != Z_OK) {
throw std::runtime_error("Unable to create shm provider.");
}
shm_provider_ = std::move(provider);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm subsystem if shared_memory is enabled in the config
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

shm_ = std::make_optional(
rmw_zenoh_cpp::ShmContext(
rmw_zenoh_cpp::zenoh_shm_alloc_size(),
rmw_zenoh_cpp::zenoh_shm_message_size_threshold()
));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
#endif

graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
graph_guard_condition_->data = &guard_condition_data_;
Expand Down Expand Up @@ -262,11 +262,13 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
return session_;
}

std::optional<zenoh::ShmProvider> & shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & shm()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return shm_provider_;
return shm_;
}
#endif

rmw_guard_condition_t * graph_guard_condition()
{
Expand Down Expand Up @@ -398,9 +400,11 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
std::string enclave_;
// An owned session.
std::shared_ptr<zenoh::Session> session_;
// An optional SHM manager that is initialized of SHM is enabled in the
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// An optional SHM context that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<zenoh::ShmProvider> shm_provider_;
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
#endif
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
Expand Down Expand Up @@ -456,11 +460,13 @@ const std::shared_ptr<zenoh::Session> rmw_context_impl_s::session() const
return data_->session();
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
///=============================================================================
std::optional<zenoh::ShmProvider> & rmw_context_impl_s::shm_provider()
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
return data_->shm_provider();
return data_->shm();
}
#endif

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ struct rmw_context_impl_s final
// Loan the Zenoh session.
const std::shared_ptr<zenoh::Session> session() const;

// Get a reference to the shm_provider.
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<zenoh::ShmProvider> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();
#endif

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down
109 changes: 96 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,11 @@ PublisherData::PublisherData(

///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -218,19 +221,59 @@ rmw_ret_t PublisherData::publish(
// To store serialized message byte array.
char * msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<zenoh::ZShmMut> shmbuf = std::nullopt;
#endif

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator]() {
if (msg_bytes) {
[&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
)
{
allocator->deallocate(msg_bytes, allocator->state);
}
});


#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
auto alloc_result = provider.alloc_gc_defrag_blocking(
max_data_length,
zenoh::AllocAlignment({0}));

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
msg_bytes = reinterpret_cast<char *>(buf.data());
shmbuf = std::make_optional(std::move(buf));
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
#endif
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand All @@ -257,11 +300,14 @@ rmw_ret_t PublisherData::publish(
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(std::move(raw_data));
auto payload =
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
shmbuf.has_value() ? zenoh::Bytes(std::move(*shmbuf)) :
#endif
zenoh::Bytes(
std::vector<uint8_t>(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
Expand All @@ -282,8 +328,11 @@ rmw_ret_t PublisherData::publish(

///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand All @@ -305,14 +354,48 @@ rmw_ret_t PublisherData::publish_serialized_message(
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

std::vector<uint8_t> raw_data(

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm.has_value() && data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
auto alloc_result = provider.alloc_gc_defrag_blocking(data_length, zenoh::AllocAlignment({0}));

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
auto msg_bytes = reinterpret_cast<char *>(buf.data());
memcpy(msg_bytes, serialized_message->buffer, data_length);
zenoh::Bytes payload(std::move(buf));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message,
source_timestamp);

pub_.put(std::move(payload), std::move(options), &result);
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
#endif
std::vector<uint8_t> raw_image(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(std::move(raw_data));
zenoh::Bytes payload(raw_image);

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);

pub_.put(std::move(payload), std::move(options), &result);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
Expand Down
15 changes: 11 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

Expand Down Expand Up @@ -56,13 +57,19 @@ class PublisherData final

// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<zenoh::ShmProvider> & shm_provider);
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Publish a serialized ROS message.
rmw_ret_t publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zenoh::ShmProvider> & shm_provider);
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
8 changes: 6 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "attachment_helpers.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

Expand Down Expand Up @@ -76,8 +77,11 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD

// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider);
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Get a copy of the keyexpr_hash of this SubscriptionData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
33 changes: 33 additions & 0 deletions rmw_zenoh_cpp/src/detail/shm_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY

#include <stdexcept>

#include "shm_context.hpp"

namespace rmw_zenoh_cpp
{
///=============================================================================
ShmContext::ShmContext(size_t alloc_size, size_t msgsize_threshold)
: // Create Layout for provider's memory
// Provider's alignment is 1 (=2^0) bytes as we are going to make only 1-byte aligned allocations
// TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment
shm_provider(zenoh::PosixShmProvider(zenoh::MemoryLayout(alloc_size, zenoh::AllocAlignment {0}))),
msgsize_threshold(msgsize_threshold)
{}
} // namespace rmw_zenoh_cpp

#endif
Loading
Loading