Skip to content

Commit

Permalink
SHM support (#39)
Browse files Browse the repository at this point in the history
* Proper SHM subsystem support

* add .gitignore

* ament_uncrustify

* ament_cpplint

* uncrustify

* code format fixes

* build with SHM by default to align with other rmw impls

* refactor: disable shared-memory in the config since it hasn't been tested

---------

Co-authored-by: yuanyuyuan <[email protected]>
  • Loading branch information
yellowhatter and YuanYuYuan authored Nov 13, 2024
1 parent 28d917e commit f400ab5
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 77 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode
9 changes: 9 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY ON CACHE BOOL "Compile Zenoh RMW with Shared Memory support")

# find dependencies
find_package(ament_cmake REQUIRED)

Expand Down Expand Up @@ -80,6 +82,13 @@ target_compile_definitions(rmw_zenoh_cpp
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
)

if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY})
target_compile_definitions(rmw_zenoh_cpp
PRIVATE
RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
)
endif()

ament_export_targets(export_rmw_zenoh_cpp)

register_rmw_implementation(
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,6 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
},
auth: {
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,6 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
},
auth: {
Expand Down
88 changes: 51 additions & 37 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,24 @@ rmw_context_impl_s::Data::Data(
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<z_owned_shm_provider_t> shm_provider,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache)
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<rmw_zenoh_cpp::ShmContext> shm
#endif
)
: enclave_(std::move(enclave)),
domain_id_(std::move(domain_id)),
session_(std::move(session)),
shm_provider_(std::move(shm_provider)),
liveliness_str_(std::move(liveliness_str)),
graph_cache_(std::move(graph_cache)),
is_shutdown_(false),
next_entity_id_(0),
is_initialized_(false),
nodes_({})
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, shm_(shm)
#endif
{
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
Expand Down Expand Up @@ -178,14 +183,18 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
}

z_undeclare_subscriber(z_move(graph_subscriber_));
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}

// Don't touch Zenoh Session if the ROS process is exiting,
// it will cause panic.
if (!is_exiting) {
z_close(z_loan_mut(session_), NULL);
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// drop SHM subsystem if used
shm_ = std::nullopt;
#endif

is_shutdown_ = true;
return RMW_RET_OK;
}
Expand Down Expand Up @@ -214,14 +223,6 @@ rmw_context_impl_s::rmw_context_impl_s(
throw std::runtime_error("Error configuring Zenoh session.");
}

// Check if shm is enabled.
z_owned_string_t shm_enabled;
zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
auto always_free_shm_enabled = rcpputils::make_scope_exit(
[&shm_enabled]() {
z_drop(z_move(shm_enabled));
});

// Initialize the zenoh session.
z_owned_session_t session;
if (z_open(&session, z_move(config), NULL) != Z_OK) {
Expand Down Expand Up @@ -309,40 +310,51 @@ rmw_context_impl_s::rmw_context_impl_s(
}
z_drop(z_move(handler));

// Initialize the shm manager if shared_memory is enabled in the config.
std::optional<z_owned_shm_provider_t> shm_provider = std::nullopt;
if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm subsystem if shared_memory is enabled in the config
std::optional<rmw_zenoh_cpp::ShmContext> shm;
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

// TODO(yuyuan): determine the default alignment of SHM
z_alloc_alignment_t alignment = {5};
z_owned_memory_layout_t layout;
z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment);
rmw_zenoh_cpp::ShmContext shm_context;

// Read msg size treshold from config
shm_context.msgsize_threshold = rmw_zenoh_cpp::zenoh_shm_message_size_threshold();

z_owned_shm_provider_t provider;
if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider.");
throw std::runtime_error("Unable to create shm manager.");
// Create Layout for provider's memory
// Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations
// TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment
z_alloc_alignment_t alignment = {0};
z_owned_memory_layout_t layout;
if (z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment) != Z_OK) {
throw std::runtime_error("Unable to create a Layout for SHM provider.");
}
shm_provider = provider;
// Create SHM provider
const auto provider_creation_result =
z_posix_shm_provider_new(&shm_context.shm_provider, z_loan(layout));
z_drop(z_move(layout));
if (provider_creation_result != Z_OK) {
throw std::runtime_error("Unable to create an SHM provider.");
}
// Upon successful provider creation, store it in the context
shm = std::make_optional(std::move(shm_context));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
auto free_shm_provider = rcpputils::make_scope_exit(
[&shm_provider]() {
if (shm_provider.has_value()) {
z_drop(z_move(shm_provider.value()));
}
});
#endif

close_session.cancel();
free_shm_provider.cancel();

data_ = std::make_shared<Data>(
domain_id,
std::move(enclave),
std::move(session),
std::move(shm_provider),
std::move(liveliness_str),
std::move(graph_cache));
std::move(graph_cache)
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::move(shm)
#endif
);

ret = data_->subscribe_to_ros_graph();
if (ret != RMW_RET_OK) {
Expand Down Expand Up @@ -376,11 +388,13 @@ const z_loaned_session_t * rmw_context_impl_s::session() const
}

///=============================================================================
std::optional<z_owned_shm_provider_t> & rmw_context_impl_s::shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->shm_provider_;
return data_->shm_;
}
#endif

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
Expand Down
21 changes: 14 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ class rmw_context_impl_s final
// create other Zenoh objects.
const z_loaned_session_t * session() const;

// Get a reference to the shm_provider.
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<z_owned_shm_provider_t> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();
#endif

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down Expand Up @@ -103,9 +105,12 @@ class rmw_context_impl_s final
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<z_owned_shm_provider_t> shm_provider,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache);
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<rmw_zenoh_cpp::ShmContext> shm
#endif
);

// Subscribe to the ROS graph.
rmw_ret_t subscribe_to_ros_graph();
Expand All @@ -126,9 +131,6 @@ class rmw_context_impl_s final
std::size_t domain_id_;
// An owned session.
z_owned_session_t session_;
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<z_owned_shm_provider_t> shm_provider_;
// Liveliness keyexpr string to subscribe to for ROS graph changes.
std::string liveliness_str_;
// Graph cache.
Expand All @@ -148,6 +150,11 @@ class rmw_context_impl_s final
bool is_initialized_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// An optional SHM context that is initialized if SHM is enabled in the
// zenoh session config.
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
#endif
};

std::shared_ptr<Data> data_{nullptr};
Expand Down
63 changes: 45 additions & 18 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ PublisherData::PublisherData(

///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider)
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & shm
#endif
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -233,37 +236,51 @@ rmw_ret_t PublisherData::publish(
}

// Serialize data.
size_t max_data_length = type_support_->get_estimated_serialized_size(
const size_t max_data_length = type_support_->get_estimated_serialized_size(
ros_message,
type_support_impl_);

// To store serialized message byte array.
char * msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_mut_t> shmbuf = std::nullopt;
auto always_free_shmbuf = rcpputils::make_scope_exit(
[&shmbuf]() {
if (shmbuf.has_value()) {
z_drop(z_move(shmbuf.value()));
}
});
#endif

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator, &shmbuf]() {
if (msg_bytes && !shmbuf.has_value()) {
[&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
)
{
allocator->deallocate(msg_bytes, allocator->state);
}
});

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm_provider.has_value()) {
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto provider = shm_provider.value();
auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
z_alloc_alignment_t alignment = {0};
z_buf_layout_alloc_result_t alloc;
// TODO(yuyuan): SHM, configure this
z_alloc_alignment_t alignment = {5};
z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment);

if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
Expand All @@ -275,11 +292,14 @@ rmw_ret_t PublisherData::publish(
return RMW_RET_ERROR;
}
} else {
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
}
#endif
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand Down Expand Up @@ -309,11 +329,15 @@ rmw_ret_t PublisherData::publish(
options.attachment = z_move(attachment);

z_owned_bytes_t payload;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
if (shmbuf.has_value()) {
z_bytes_from_shm_mut(&payload, z_move(shmbuf.value()));
} else {
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
}
#endif
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
if (res != Z_OK) {
Expand All @@ -332,8 +356,11 @@ rmw_ret_t PublisherData::publish(

///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<z_owned_shm_provider_t> & /*shm_provider*/)
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & /*shm_provider*/
#endif
)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand Down
Loading

0 comments on commit f400ab5

Please sign in to comment.