Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared Memory on C++ API #362

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4782211
chore: update to self-hosted runners
imstevenpmwork Aug 27, 2024
0e3a8c5
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Sep 12, 2024
e935806
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Sep 16, 2024
7955b7c
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Sep 20, 2024
da4a9e4
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Sep 26, 2024
66997d6
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Sep 29, 2024
70467e1
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Oct 1, 2024
f83dd94
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Oct 2, 2024
36f21a4
fix: add ament_export_dependencies to cmake
imstevenpmwork Oct 2, 2024
080e996
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Oct 4, 2024
316d486
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Oct 7, 2024
685c6f1
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Oct 11, 2024
955ef90
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Oct 14, 2024
8c1b932
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Oct 17, 2024
e69b318
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Nov 4, 2024
6648a75
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Nov 7, 2024
c92ca58
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Nov 14, 2024
c8f6827
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Nov 20, 2024
efba411
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Nov 28, 2024
bd8d967
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Dec 6, 2024
3bf7153
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Dec 10, 2024
a50350c
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Dec 11, 2024
7dd0ea6
fix: update runner label
imstevenpmwork Dec 11, 2024
1bea19b
fix: update runner labels
imstevenpmwork Dec 11, 2024
ebfada8
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Dec 12, 2024
6e4c2a0
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Dec 16, 2024
6b94577
Merge branch 'ros2:rolling' into rolling
imstevenpmwork Dec 20, 2024
ce1216f
C++ SHM
yellowhatter Dec 26, 2024
0e9ace8
fix default shm msg size threshold
yellowhatter Dec 26, 2024
b3d0a47
code format fix
yellowhatter Dec 26, 2024
b420096
add lost tracepoints
yellowhatter Dec 26, 2024
6f42c1b
code format
yellowhatter Dec 26, 2024
8e8e4b8
remove unnecessry flags from zenoh-cpp vendor
yellowhatter Dec 26, 2024
085771e
review fixes
yellowhatter Dec 26, 2024
e42499c
codestyle fixes
yellowhatter Dec 26, 2024
c06c5a7
Change 2023 -> 2024 in copyright
yellowhatter Dec 27, 2024
49e966e
Merge commit 'e7493d38fa85e2979972bbc44a0a6021c290343e'
yellowhatter Dec 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ jobs:
BUILD_TYPE: binary
env:
ROS2_REPOS_FILE_URL: 'https://raw.githubusercontent.com/ros2/ros2/${{ matrix.ROS_DISTRO }}/ros2.repos'
runs-on: ubuntu-latest
runs-on:
- self-hosted
- X64
- ubuntu-latest
container:
image: ${{ matrix.BUILD_TYPE == 'binary' && format('ros:{0}-ros-base', matrix.ROS_DISTRO) || 'ubuntu:noble' }}
steps:
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/check_logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ name: "Check logging macros"
on: [pull_request]
jobs:
check_logging:
runs-on: ubuntu-latest
runs-on:
- self-hosted
- X64
- ubuntu-latest
steps:
- name: Check logging macros
uses: JJ/github-pr-contains-action@releases/v14.1
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/style.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ defaults:
shell: bash
jobs:
test:
runs-on: ubuntu-latest
runs-on:
- self-hosted
- X64
- ubuntu-latest
strategy:
fail-fast: false
matrix:
Expand Down
10 changes: 10 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY TRUE CACHE BOOL "Compile Zenoh RMW with Shared Memory support")

# find dependencies
find_package(ament_cmake REQUIRED)

Expand Down Expand Up @@ -43,6 +45,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/rmw_subscription_data.cpp
src/detail/service_type_support.cpp
src/detail/simplified_xxhash3.cpp
src/detail/shm_context.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
Expand Down Expand Up @@ -81,6 +84,13 @@ target_compile_definitions(rmw_zenoh_cpp
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
)

if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY})
target_compile_definitions(rmw_zenoh_cpp
PUBLIC
RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
)
endif()

ament_export_targets(export_rmw_zenoh_cpp)

register_rmw_implementation(
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
44 changes: 25 additions & 19 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,21 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
"rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n");
}
}

// Initialize the shm manager if shared_memory is enabled in the config.
shm_provider_ = std::nullopt;
#ifndef _MSC_VER
if (shm_enabled == "true") {
auto layout = zenoh::MemoryLayout(
SHM_BUFFER_SIZE_MB * 1024 * 1024,
zenoh::AllocAlignment({5}));
zenoh::PosixShmProvider provider(layout, &result);
if (result != Z_OK) {
throw std::runtime_error("Unable to create shm provider.");
}
shm_provider_ = std::move(provider);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm subsystem if shared_memory is enabled in the config
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

shm_ = std::make_optional(
rmw_zenoh_cpp::ShmContext(
rmw_zenoh_cpp::zenoh_shm_alloc_size(),
rmw_zenoh_cpp::zenoh_shm_message_size_threshold()
));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
#endif

graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
graph_guard_condition_->data = &guard_condition_data_;
Expand Down Expand Up @@ -262,11 +262,13 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
return session_;
}

std::optional<zenoh::ShmProvider> & shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & shm()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return shm_provider_;
return shm_;
}
#endif

rmw_guard_condition_t * graph_guard_condition()
{
Expand Down Expand Up @@ -398,9 +400,11 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
std::string enclave_;
// An owned session.
std::shared_ptr<zenoh::Session> session_;
// An optional SHM manager that is initialized of SHM is enabled in the
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// An optional SHM context that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<zenoh::ShmProvider> shm_provider_;
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
#endif
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
Expand Down Expand Up @@ -456,11 +460,13 @@ const std::shared_ptr<zenoh::Session> rmw_context_impl_s::session() const
return data_->session();
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
///=============================================================================
std::optional<zenoh::ShmProvider> & rmw_context_impl_s::shm_provider()
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
return data_->shm_provider();
return data_->shm();
}
#endif

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ struct rmw_context_impl_s final
// Loan the Zenoh session.
const std::shared_ptr<zenoh::Session> session() const;

// Get a reference to the shm_provider.
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<zenoh::ShmProvider> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();
#endif

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down
109 changes: 96 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,11 @@ PublisherData::PublisherData(

///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -218,19 +221,59 @@ rmw_ret_t PublisherData::publish(
// To store serialized message byte array.
char * msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<zenoh::ZShmMut> shmbuf = std::nullopt;
#endif

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator]() {
if (msg_bytes) {
[&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
)
{
allocator->deallocate(msg_bytes, allocator->state);
}
});


#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
auto alloc_result = provider.alloc_gc_defrag_blocking(
max_data_length,
zenoh::AllocAlignment({0}));

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
msg_bytes = reinterpret_cast<char *>(buf.data());
shmbuf = std::make_optional(std::move(buf));
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
#endif
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand All @@ -257,11 +300,14 @@ rmw_ret_t PublisherData::publish(
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(std::move(raw_data));
auto payload =
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
shmbuf.has_value() ? zenoh::Bytes(std::move(*shmbuf)) :
#endif
zenoh::Bytes(
std::vector<uint8_t>(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
Expand All @@ -282,8 +328,11 @@ rmw_ret_t PublisherData::publish(

///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand All @@ -305,14 +354,48 @@ rmw_ret_t PublisherData::publish_serialized_message(
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

std::vector<uint8_t> raw_data(

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm.has_value() && data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
auto alloc_result = provider.alloc_gc_defrag_blocking(data_length, zenoh::AllocAlignment({0}));

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
auto msg_bytes = reinterpret_cast<char *>(buf.data());
memcpy(msg_bytes, serialized_message->buffer, data_length);
zenoh::Bytes payload(std::move(buf));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message,
source_timestamp);

pub_.put(std::move(payload), std::move(options), &result);
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
#endif
std::vector<uint8_t> raw_image(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(std::move(raw_data));
zenoh::Bytes payload(raw_image);

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);

pub_.put(std::move(payload), std::move(options), &result);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
Expand Down
15 changes: 11 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

Expand Down Expand Up @@ -56,13 +57,19 @@ class PublisherData final

// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<zenoh::ShmProvider> & shm_provider);
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Publish a serialized ROS message.
rmw_ret_t publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zenoh::ShmProvider> & shm_provider);
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
8 changes: 6 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "attachment_helpers.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

Expand Down Expand Up @@ -75,8 +76,11 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD

// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider);
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, const std::optional<ShmContext> & shm
#endif
);

// Get a copy of the keyexpr_hash of this SubscriptionData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
Loading
Loading