Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHM support #39

Merged
merged 9 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode
9 changes: 9 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY ON CACHE BOOL "Compile Zenoh RMW with Shared Memory support")

# find dependencies
find_package(ament_cmake REQUIRED)

Expand Down Expand Up @@ -80,6 +82,13 @@ target_compile_definitions(rmw_zenoh_cpp
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
)

if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY})
target_compile_definitions(rmw_zenoh_cpp
PRIVATE
RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
)
endif()

ament_export_targets(export_rmw_zenoh_cpp)

register_rmw_implementation(
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,6 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
},
auth: {
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,6 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
},
auth: {
Expand Down
88 changes: 51 additions & 37 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,24 @@ rmw_context_impl_s::Data::Data(
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<z_owned_shm_provider_t> shm_provider,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache)
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<rmw_zenoh_cpp::ShmContext> shm
#endif
)
: enclave_(std::move(enclave)),
domain_id_(std::move(domain_id)),
session_(std::move(session)),
shm_provider_(std::move(shm_provider)),
liveliness_str_(std::move(liveliness_str)),
graph_cache_(std::move(graph_cache)),
is_shutdown_(false),
next_entity_id_(0),
is_initialized_(false),
nodes_({})
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, shm_(shm)
#endif
{
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
Expand Down Expand Up @@ -178,14 +183,18 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
}

z_undeclare_subscriber(z_move(graph_subscriber_));
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}

// Don't touch Zenoh Session if the ROS process is exiting,
// it will cause panic.
if (!is_exiting) {
z_close(z_loan_mut(session_), NULL);
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// drop SHM subsystem if used
shm_ = std::nullopt;
#endif

is_shutdown_ = true;
return RMW_RET_OK;
}
Expand Down Expand Up @@ -214,14 +223,6 @@ rmw_context_impl_s::rmw_context_impl_s(
throw std::runtime_error("Error configuring Zenoh session.");
}

// Check if shm is enabled.
z_owned_string_t shm_enabled;
zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
auto always_free_shm_enabled = rcpputils::make_scope_exit(
[&shm_enabled]() {
z_drop(z_move(shm_enabled));
});

// Initialize the zenoh session.
z_owned_session_t session;
if (z_open(&session, z_move(config), NULL) != Z_OK) {
Expand Down Expand Up @@ -309,40 +310,51 @@ rmw_context_impl_s::rmw_context_impl_s(
}
z_drop(z_move(handler));

// Initialize the shm manager if shared_memory is enabled in the config.
std::optional<z_owned_shm_provider_t> shm_provider = std::nullopt;
if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm subsystem if shared_memory is enabled in the config
std::optional<rmw_zenoh_cpp::ShmContext> shm;
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

// TODO(yuyuan): determine the default alignment of SHM
z_alloc_alignment_t alignment = {5};
z_owned_memory_layout_t layout;
z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment);
rmw_zenoh_cpp::ShmContext shm_context;

// Read msg size treshold from config
shm_context.msgsize_threshold = rmw_zenoh_cpp::zenoh_shm_message_size_threshold();

z_owned_shm_provider_t provider;
if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider.");
throw std::runtime_error("Unable to create shm manager.");
// Create Layout for provider's memory
// Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations
// TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment
z_alloc_alignment_t alignment = {0};
z_owned_memory_layout_t layout;
if (z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment) != Z_OK) {
throw std::runtime_error("Unable to create a Layout for SHM provider.");
}
shm_provider = provider;
// Create SHM provider
const auto provider_creation_result =
z_posix_shm_provider_new(&shm_context.shm_provider, z_loan(layout));
z_drop(z_move(layout));
if (provider_creation_result != Z_OK) {
throw std::runtime_error("Unable to create an SHM provider.");
}
// Upon successful provider creation, store it in the context
shm = std::make_optional(std::move(shm_context));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
auto free_shm_provider = rcpputils::make_scope_exit(
[&shm_provider]() {
if (shm_provider.has_value()) {
z_drop(z_move(shm_provider.value()));
}
});
#endif

close_session.cancel();
free_shm_provider.cancel();

data_ = std::make_shared<Data>(
domain_id,
std::move(enclave),
std::move(session),
std::move(shm_provider),
std::move(liveliness_str),
std::move(graph_cache));
std::move(graph_cache)
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::move(shm)
#endif
);

ret = data_->subscribe_to_ros_graph();
if (ret != RMW_RET_OK) {
Expand Down Expand Up @@ -376,11 +388,13 @@ const z_loaned_session_t * rmw_context_impl_s::session() const
}

///=============================================================================
std::optional<z_owned_shm_provider_t> & rmw_context_impl_s::shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->shm_provider_;
return data_->shm_;
}
#endif

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
Expand Down
21 changes: 14 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ class rmw_context_impl_s final
// create other Zenoh objects.
const z_loaned_session_t * session() const;

// Get a reference to the shm_provider.
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<z_owned_shm_provider_t> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();
#endif

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down Expand Up @@ -103,9 +105,12 @@ class rmw_context_impl_s final
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<z_owned_shm_provider_t> shm_provider,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache);
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<rmw_zenoh_cpp::ShmContext> shm
#endif
);

// Subscribe to the ROS graph.
rmw_ret_t subscribe_to_ros_graph();
Expand All @@ -126,9 +131,6 @@ class rmw_context_impl_s final
std::size_t domain_id_;
// An owned session.
z_owned_session_t session_;
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<z_owned_shm_provider_t> shm_provider_;
// Liveliness keyexpr string to subscribe to for ROS graph changes.
std::string liveliness_str_;
// Graph cache.
Expand All @@ -148,6 +150,11 @@ class rmw_context_impl_s final
bool is_initialized_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// An optional SHM context that is initialized if SHM is enabled in the
// zenoh session config.
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
#endif
};

std::shared_ptr<Data> data_{nullptr};
Expand Down
63 changes: 45 additions & 18 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ PublisherData::PublisherData(

///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider)
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & shm
#endif
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -233,37 +236,51 @@ rmw_ret_t PublisherData::publish(
}

// Serialize data.
size_t max_data_length = type_support_->get_estimated_serialized_size(
const size_t max_data_length = type_support_->get_estimated_serialized_size(
ros_message,
type_support_impl_);

// To store serialized message byte array.
char * msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_mut_t> shmbuf = std::nullopt;
auto always_free_shmbuf = rcpputils::make_scope_exit(
[&shmbuf]() {
if (shmbuf.has_value()) {
z_drop(z_move(shmbuf.value()));
}
});
#endif

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator, &shmbuf]() {
if (msg_bytes && !shmbuf.has_value()) {
[&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
)
{
allocator->deallocate(msg_bytes, allocator->state);
}
});

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm_provider.has_value()) {
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto provider = shm_provider.value();
auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
z_alloc_alignment_t alignment = {0};
z_buf_layout_alloc_result_t alloc;
// TODO(yuyuan): SHM, configure this
z_alloc_alignment_t alignment = {5};
z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment);

if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
Expand All @@ -275,11 +292,14 @@ rmw_ret_t PublisherData::publish(
return RMW_RET_ERROR;
}
} else {
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
}
#endif
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand Down Expand Up @@ -309,11 +329,15 @@ rmw_ret_t PublisherData::publish(
options.attachment = z_move(attachment);

z_owned_bytes_t payload;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
if (shmbuf.has_value()) {
z_bytes_from_shm_mut(&payload, z_move(shmbuf.value()));
} else {
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
}
#endif
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
if (res != Z_OK) {
Expand All @@ -332,8 +356,11 @@ rmw_ret_t PublisherData::publish(

///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<z_owned_shm_provider_t> & /*shm_provider*/)
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & /*shm_provider*/
#endif
)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand Down
Loading
Loading