diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 68609cd7..1f341610 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -493,6 +493,8 @@ void ClientData::_shutdown() "Unable to undeclare liveliness token"); return; } + // TODO(ahcorde) + // z_drop(z_move(keyexpr_)); is_shutdown_ = true; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 070d13a6..b325ba7d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -132,7 +132,7 @@ class rmw_context_impl_s::Data final // Query router/liveliness participants to get graph information before the session was started. // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // replies for the z_liveliness_get() call. This is necessary as if the `bound` // is too low, the channel may starve the zenoh executor of its threads which // would lead to deadlocks when trying to receive replies and block the // execution here. @@ -222,7 +222,8 @@ class rmw_context_impl_s::Data final &err); if (err != Z_OK) { - throw std::runtime_error("unable to create zenoh subscription. "); + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + throw std::runtime_error("Unable to subscribe to ROS graph updates."); } } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index dd59c290..1a6326ee 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -225,29 +225,28 @@ rmw_ret_t PublisherData::publish( }); // Get memory from SHM buffer if available. - if (shm_provider.has_value()) { - RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); - - auto provider = shm_provider.value()._0; - z_buf_layout_alloc_result_t alloc; - // TODO(yuyuan): SHM, configure this - z_alloc_alignment_t alignment = {5}; - z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); - - if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { - shmbuf = std::make_optional(alloc.buf); - msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); - } else { - // TODO(Yadunund): Should we revert to regular allocation and not return an error? - RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); - return RMW_RET_ERROR; - } - } else { - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - } + // if (shm_provider.has_value()) { + // RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); + + // auto provider = shm_provider.value()._0; + // z_buf_layout_alloc_result_t alloc; + // // TODO(yuyuan): SHM, configure this + // z_alloc_alignment_t alignment = {5}; + // z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); + + // if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + // shmbuf = std::make_optional(alloc.buf); + // msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); + // } else { + // // TODO(Yadunund): Should we revert to regular allocation and not return an error? + // RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); + // return RMW_RET_ERROR; + // } + // } else { + // Get memory from the allocator. + msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 3f6675ae..33f85349 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -393,6 +393,10 @@ rmw_create_publisher( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (!context_impl->session_is_valid()) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { @@ -747,6 +751,10 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + if (pub_data->is_shutdown()) { + return RMW_RET_ERROR; + } + return RMW_RET_OK; } @@ -918,6 +926,10 @@ rmw_create_subscription( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (!context_impl->session_is_valid()) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { @@ -1386,6 +1398,10 @@ rmw_create_client( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (!context_impl->session_is_valid()) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } // Get the service type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); @@ -1630,6 +1646,10 @@ rmw_create_service( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (!context_impl->session_is_valid()) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } // Get the RMW type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); diff --git a/zenoh_cpp_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt index c53bfe45..37bdbd41 100644 --- a/zenoh_cpp_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -26,7 +26,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION f5110732a303f31ed6d835684de1fb7f2c7af001 + VCS_VERSION 42e717ff7b633649f11ebb7800b71d4939cd21c7 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" @@ -37,7 +37,7 @@ ament_export_dependencies(zenohc) ament_vendor(zenoh_cpp_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp - VCS_VERSION a1875f9085bc068b6c5140778ff5415ae82248d7 + VCS_VERSION e8eca99b4eaff0963e52aefd8405909c1552080d CMAKE_ARGS -DZENOHCXX_ZENOHC=OFF )