diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index ed3bd9a9..e997330f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -218,29 +218,43 @@ class rmw_context_impl_s::Data final // Shutdown the Zenoh session. rmw_ret_t shutdown() { - { - std::lock_guard lock(mutex_); - rmw_ret_t ret = RMW_RET_OK; - if (is_shutdown_) { - return ret; - } - - z_undeclare_subscriber(z_move(graph_subscriber_)); - if (shm_provider_.has_value()) { - z_drop(z_move(shm_provider_.value())); - } - is_shutdown_ = true; - - // We specifically do *not* hold the mutex_ while tearing down the session; this allows us - // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). + std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; + if (is_shutdown_) { + return ret; } - // Close the zenoh session - if (z_close(z_loan_mut(session_), NULL) != Z_OK) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_provider_.has_value()) { + z_drop(z_move(shm_provider_.value())); + } + is_shutdown_ = true; + + // Close the zenoh session in background + close_handle = zc_owned_concurrent_close_handle_t(); + z_close_options_t options; + z_close_options_default(&options); + options.internal_out_concurrent = &close_handle.value(); + if (z_close(z_loan_mut(session_), &options) != Z_OK) { + close_handle.reset(); + RMW_SET_ERROR_MSG("Error while starting zenoh session close!"); return RMW_RET_ERROR; } + + return RMW_RET_OK; + } + ///============================================================================= + rmw_ret_t wait_for_session_close() + { + if (close_handle.has_value()) { + zc_owned_concurrent_close_handle_t handle = close_handle.value(); + close_handle.reset(); + if (zc_concurrent_close_handle_wait(z_move(handle)) < 0) { + RMW_SET_ERROR_MSG("Error closing session!"); + return RMW_RET_ERROR; + } + } return RMW_RET_OK; } @@ -379,8 +393,10 @@ class rmw_context_impl_s::Data final ~Data() { auto ret = this->shutdown(); + auto ret2 = this->wait_for_session_close(); nodes_.clear(); static_cast(ret); + static_cast(ret2); } private: @@ -406,6 +422,8 @@ class rmw_context_impl_s::Data final rmw_zenoh_cpp::GuardCondition guard_condition_data_; // Shutdown flag. bool is_shutdown_; + // Close operation handle + std::optional close_handle; // A counter to assign a local id for every entity created in this session. std::size_t next_entity_id_; // Nodes created from this context. @@ -501,6 +519,12 @@ rmw_ret_t rmw_context_impl_s::shutdown() return data_->shutdown(); } +///============================================================================= +rmw_ret_t rmw_context_impl_s::wait_for_session_close() +{ + return data_->wait_for_session_close(); +} + ///============================================================================= bool rmw_context_impl_s::is_shutdown() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 1e46d6af..0461c1df 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -62,9 +62,12 @@ class rmw_context_impl_s final // Get a unique id for a new entity. std::size_t get_next_entity_id(); - // Shutdown the Zenoh session. + // Asynchronously shutdown the Zenoh session. rmw_ret_t shutdown(); + // Wait for Zenoh session shutdown to comlete. + rmw_ret_t wait_for_session_close(); + // Check if the Zenoh session is shutdown. bool is_shutdown() const; diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index cddd2fd4..d2cdb718 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -25,8 +25,8 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh/pull/1150 (fix deadlock issue https://github.com/ros2/rmw_zenoh/issues/182) # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor - VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 42e717ff7b633649f11ebb7800b71d4939cd21c7 + VCS_URL https://github.com/ZettaScaleLabs/zenoh-c.git + VCS_VERSION 4cf9b92e0ad8ba494b935057ecc771a640e8b72d CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"