Skip to content

Commit

Permalink
Merge remote-tracking branch 'zenoh/dev/1.0.0' into ahcorde/dev/1.0.0…
Browse files Browse the repository at this point in the history
…-cpp
  • Loading branch information
ahcorde committed Dec 5, 2024
2 parents 9e24ad9 + 84d1267 commit 687d01c
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 27 deletions.
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ void ClientData::_shutdown()
"Unable to undeclare liveliness token");
return;
}
// TODO(ahcorde)
// z_drop(z_move(keyexpr_));

is_shutdown_ = true;
}
Expand Down
5 changes: 3 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class rmw_context_impl_s::Data final

// Query router/liveliness participants to get graph information before the session was started.
// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
// replies for the zc_liveliness_get() call. This is necessary as if the `bound`
// replies for the z_liveliness_get() call. This is necessary as if the `bound`
// is too low, the channel may starve the zenoh executor of its threads which
// would lead to deadlocks when trying to receive replies and block the
// execution here.
Expand Down Expand Up @@ -222,7 +222,8 @@ class rmw_context_impl_s::Data final
&err);

if (err != Z_OK) {
throw std::runtime_error("unable to create zenoh subscription. ");
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
throw std::runtime_error("Unable to subscribe to ROS graph updates.");
}
}

Expand Down
45 changes: 22 additions & 23 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,29 +225,28 @@ rmw_ret_t PublisherData::publish(
});

// Get memory from SHM buffer if available.
if (shm_provider.has_value()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto provider = shm_provider.value()._0;
z_buf_layout_alloc_result_t alloc;
// TODO(yuyuan): SHM, configure this
z_alloc_alignment_t alignment = {5};
z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment);

if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
shmbuf = std::make_optional(alloc.buf);
msg_bytes = reinterpret_cast<char *>(z_shm_mut_data_mut(z_loan_mut(alloc.buf)));
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
}
// if (shm_provider.has_value()) {
// RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

// auto provider = shm_provider.value()._0;
// z_buf_layout_alloc_result_t alloc;
// // TODO(yuyuan): SHM, configure this
// z_alloc_alignment_t alignment = {5};
// z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment);

// if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
// shmbuf = std::make_optional(alloc.buf);
// msg_bytes = reinterpret_cast<char *>(z_shm_mut_data_mut(z_loan_mut(alloc.buf)));
// } else {
// // TODO(Yadunund): Should we revert to regular allocation and not return an error?
// RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
// return RMW_RET_ERROR;
// }
// } else {
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand Down
20 changes: 20 additions & 0 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ rmw_create_publisher(
context_impl,
"unable to get rmw_context_impl_s",
return nullptr);
if (!context_impl->session_is_valid()) {
RMW_SET_ERROR_MSG("zenoh session is invalid");
return nullptr;
}

rcutils_allocator_t * allocator = &node->context->options.allocator;
if (!rcutils_allocator_is_valid(allocator)) {
Expand Down Expand Up @@ -747,6 +751,10 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)
auto pub_data = node_data->get_pub_data(publisher);
RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT);

if (pub_data->is_shutdown()) {
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

Expand Down Expand Up @@ -918,6 +926,10 @@ rmw_create_subscription(
context_impl,
"unable to get rmw_context_impl_s",
return nullptr);
if (!context_impl->session_is_valid()) {
RMW_SET_ERROR_MSG("zenoh session is invalid");
return nullptr;
}

rcutils_allocator_t * allocator = &node->context->options.allocator;
if (!rcutils_allocator_is_valid(allocator)) {
Expand Down Expand Up @@ -1386,6 +1398,10 @@ rmw_create_client(
context_impl,
"unable to get rmw_context_impl_s",
return nullptr);
if (!context_impl->session_is_valid()) {
RMW_SET_ERROR_MSG("zenoh session is invalid");
return nullptr;
}

// Get the service type support.
const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports);
Expand Down Expand Up @@ -1630,6 +1646,10 @@ rmw_create_service(
context_impl,
"unable to get rmw_context_impl_s",
return nullptr);
if (!context_impl->session_is_valid()) {
RMW_SET_ERROR_MSG("zenoh session is invalid");
return nullptr;
}

// Get the RMW type support.
const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports);
Expand Down
4 changes: 2 additions & 2 deletions zenoh_cpp_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$<SEMICOLON>--features=shared-memor
# - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers)
ament_vendor(zenoh_c_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git
VCS_VERSION f5110732a303f31ed6d835684de1fb7f2c7af001
VCS_VERSION 42e717ff7b633649f11ebb7800b71d4939cd21c7
CMAKE_ARGS
"-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}"
"-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"
Expand All @@ -37,7 +37,7 @@ ament_export_dependencies(zenohc)

ament_vendor(zenoh_cpp_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp
VCS_VERSION a1875f9085bc068b6c5140778ff5415ae82248d7
VCS_VERSION e8eca99b4eaff0963e52aefd8405909c1552080d
CMAKE_ARGS
-DZENOHCXX_ZENOHC=OFF
)
Expand Down

0 comments on commit 687d01c

Please sign in to comment.