Skip to content

Commit

Permalink
Merge remote-tracking branch 'zenoh/dev/1.0.0' into ahcorde/dev/1.0.0…
Browse files Browse the repository at this point in the history
…-cpp
  • Loading branch information
ahcorde committed Nov 14, 2024
2 parents c734df6 + f400ab5 commit f1d95f7
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 80 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode
10 changes: 10 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY ON CACHE BOOL "Compile Zenoh RMW with Shared Memory support")

# find dependencies
find_package(ament_cmake REQUIRED)

Expand Down Expand Up @@ -41,6 +43,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/rmw_service_data.cpp
src/detail/rmw_subscription_data.cpp
src/detail/service_type_support.cpp
src/detail/shm_context.cpp
src/detail/simplified_xxhash3.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
Expand Down Expand Up @@ -81,6 +84,13 @@ target_compile_definitions(rmw_zenoh_cpp
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
)

if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY})
target_compile_definitions(rmw_zenoh_cpp
PRIVATE
RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
)
endif()

ament_export_targets(export_rmw_zenoh_cpp)

register_rmw_implementation(
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,6 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
},
auth: {
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,6 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
},
auth: {
Expand Down
93 changes: 56 additions & 37 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,25 @@ rmw_context_impl_s::Data::Data(
const std::string & enclave,
z_owned_session_t session,
std::shared_ptr<zenoh::Session> session_cpp,
std::optional<z_owned_shm_provider_t> shm_provider,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache)
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<rmw_zenoh_cpp::ShmContext> shm
#endif
)
: enclave_(std::move(enclave)),
domain_id_(std::move(domain_id)),
session_(std::move(session)),
session_cpp_(session_cpp),
shm_provider_(std::move(shm_provider)),
liveliness_str_(std::move(liveliness_str)),
graph_cache_(std::move(graph_cache)),
is_shutdown_(false),
next_entity_id_(0),
is_initialized_(false),
nodes_({})
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, shm_(shm)
#endif
{
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
Expand Down Expand Up @@ -221,10 +226,14 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
}

z_undeclare_subscriber(z_move(graph_subscriber_));
graph_subscriber_cpp_.clear();
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}

// graph_subscriber_cpp_.clear();

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// drop SHM subsystem if used
shm_ = std::nullopt;
#endif

is_shutdown_ = true;
return RMW_RET_OK;
}
Expand Down Expand Up @@ -335,42 +344,50 @@ rmw_context_impl_s::rmw_context_impl_s(
<< sample.get_payload().as_string() << "')\n";
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm subsystem if shared_memory is enabled in the config
std::optional<rmw_zenoh_cpp::ShmContext> shm;
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

rmw_zenoh_cpp::ShmContext shm_context;

// // Initialize the shm manager if shared_memory is enabled in the config.
std::optional<z_owned_shm_provider_t> shm_provider = std::nullopt;
// // if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
// // RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

// // // TODO(yuyuan): determine the default alignment of SHM
// // z_alloc_alignment_t alignment = {5};
// // z_owned_memory_layout_t layout;
// // z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment);

// // z_owned_shm_provider_t provider;
// // if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) {
// // RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider.");
// // throw std::runtime_error("Unable to create shm manager.");
// // }
// // shm_provider = provider;
// // }
// // auto free_shm_provider = rcpputils::make_scope_exit(
// // [&shm_provider]() {
// // if (shm_provider.has_value()) {
// // z_drop(z_move(shm_provider.value()));
// // }
// // });

// // close_session.cancel();
// // free_shm_provider.cancel();
// Read msg size treshold from config
shm_context.msgsize_threshold = rmw_zenoh_cpp::zenoh_shm_message_size_threshold();

// Create Layout for provider's memory
// Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations
// TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment
z_alloc_alignment_t alignment = {0};
z_owned_memory_layout_t layout;
if (z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment) != Z_OK) {
throw std::runtime_error("Unable to create a Layout for SHM provider.");
}
// Create SHM provider
const auto provider_creation_result =
z_posix_shm_provider_new(&shm_context.shm_provider, z_loan(layout));
z_drop(z_move(layout));
if (provider_creation_result != Z_OK) {
throw std::runtime_error("Unable to create an SHM provider.");
}
// Upon successful provider creation, store it in the context
shm = std::make_optional(std::move(shm_context));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
#endif

data_ = std::make_shared<Data>(
domain_id,
std::move(enclave),
std::move(session->_0),
std::move(session),
std::move(shm_provider),
std::move(liveliness_str),
std::move(graph_cache));
std::move(graph_cache)
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::move(shm)
#endif
);

ret = data_->subscribe_to_ros_graph();
if (ret != RMW_RET_OK) {
Expand Down Expand Up @@ -404,11 +421,13 @@ const std::shared_ptr<zenoh::Session> rmw_context_impl_s::session_cpp() const
}

///=============================================================================
std::optional<z_owned_shm_provider_t> & rmw_context_impl_s::shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->shm_provider_;
return data_->shm_;
}
#endif

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
Expand Down
23 changes: 15 additions & 8 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include <zenoh.h>

# include <cstddef>
#include <cstddef>
#include <memory>
#include <mutex>
#include <optional>
Expand Down Expand Up @@ -55,11 +55,13 @@ class rmw_context_impl_s final

const std::shared_ptr<zenoh::Session> session_cpp() const;

// Get a reference to the shm_provider.
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<z_owned_shm_provider_t> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();
#endif

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down Expand Up @@ -106,9 +108,12 @@ class rmw_context_impl_s final
const std::string & enclave,
z_owned_session_t session,
std::shared_ptr<zenoh::Session> session_cpp,
std::optional<z_owned_shm_provider_t> shm_provider,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache);
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<rmw_zenoh_cpp::ShmContext> shm
#endif
);

// Subscribe to the ROS graph.
rmw_ret_t subscribe_to_ros_graph();
Expand All @@ -130,9 +135,6 @@ class rmw_context_impl_s final
// An owned session.
z_owned_session_t session_;
std::shared_ptr<zenoh::Session> session_cpp_;
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<z_owned_shm_provider_t> shm_provider_;
// Liveliness keyexpr string to subscribe to for ROS graph changes.
std::string liveliness_str_;
// Graph cache.
Expand All @@ -153,6 +155,11 @@ class rmw_context_impl_s final
bool is_initialized_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// An optional SHM context that is initialized if SHM is enabled in the
// zenoh session config.
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
#endif
};

std::shared_ptr<Data> data_{nullptr};
Expand Down
63 changes: 45 additions & 18 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ PublisherData::PublisherData(

///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider)
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & shm
#endif
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -233,37 +236,51 @@ rmw_ret_t PublisherData::publish(
}

// Serialize data.
size_t max_data_length = type_support_->get_estimated_serialized_size(
const size_t max_data_length = type_support_->get_estimated_serialized_size(
ros_message,
type_support_impl_);

// To store serialized message byte array.
char * msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_mut_t> shmbuf = std::nullopt;
auto always_free_shmbuf = rcpputils::make_scope_exit(
[&shmbuf]() {
if (shmbuf.has_value()) {
z_drop(z_move(shmbuf.value()));
}
});
#endif

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator, &shmbuf]() {
if (msg_bytes && !shmbuf.has_value()) {
[&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
)
{
allocator->deallocate(msg_bytes, allocator->state);
}
});

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm_provider.has_value()) {
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto provider = shm_provider.value();
auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
z_alloc_alignment_t alignment = {0};
z_buf_layout_alloc_result_t alloc;
// TODO(yuyuan): SHM, configure this
z_alloc_alignment_t alignment = {5};
z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment);

if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
Expand All @@ -275,11 +292,14 @@ rmw_ret_t PublisherData::publish(
return RMW_RET_ERROR;
}
} else {
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
}
#endif
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand Down Expand Up @@ -309,11 +329,15 @@ rmw_ret_t PublisherData::publish(
options.attachment = z_move(attachment);

z_owned_bytes_t payload;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
if (shmbuf.has_value()) {
z_bytes_from_shm_mut(&payload, z_move(shmbuf.value()));
} else {
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
}
#endif
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
if (res != Z_OK) {
Expand All @@ -332,8 +356,11 @@ rmw_ret_t PublisherData::publish(

///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<z_owned_shm_provider_t> & /*shm_provider*/)
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & /*shm_provider*/
#endif
)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand Down
Loading

0 comments on commit f1d95f7

Please sign in to comment.