Skip to content

Commit

Permalink
Support background Session close that can be safely waited even in at…
Browse files Browse the repository at this point in the history
…exit
  • Loading branch information
yellowhatter committed Dec 5, 2024
1 parent 84d1267 commit 3034213
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 21 deletions.
60 changes: 42 additions & 18 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,29 +218,43 @@ class rmw_context_impl_s::Data final
// Shutdown the Zenoh session.
rmw_ret_t shutdown()
{
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return ret;
}

z_undeclare_subscriber(z_move(graph_subscriber_));
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}
is_shutdown_ = true;

// We specifically do *not* hold the mutex_ while tearing down the session; this allows us
// to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler().
std::lock_guard<std::recursive_mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return ret;
}

// Close the zenoh session
if (z_close(z_loan_mut(session_), NULL) != Z_OK) {
RMW_SET_ERROR_MSG("Error while closing zenoh session");
z_undeclare_subscriber(z_move(graph_subscriber_));
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}
is_shutdown_ = true;

// Close the zenoh session in background
close_handle = zc_owned_concurrent_close_handle_t();
z_close_options_t options;
z_close_options_default(&options);
options.internal_out_concurrent = &close_handle.value();
if (z_close(z_loan_mut(session_), &options) != Z_OK) {
close_handle.reset();
RMW_SET_ERROR_MSG("Error while starting zenoh session close!");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

///=============================================================================
rmw_ret_t wait_for_session_close()
{
if (close_handle.has_value()) {
zc_owned_concurrent_close_handle_t handle = close_handle.value();
close_handle.reset();
if (zc_concurrent_close_handle_wait(z_move(handle)) < 0) {
RMW_SET_ERROR_MSG("Error closing session!");
return RMW_RET_ERROR;
}
}
return RMW_RET_OK;
}

Expand Down Expand Up @@ -379,8 +393,10 @@ class rmw_context_impl_s::Data final
~Data()
{
auto ret = this->shutdown();
auto ret2 = this->wait_for_session_close();
nodes_.clear();
static_cast<void>(ret);
static_cast<void>(ret2);
}

private:
Expand All @@ -406,6 +422,8 @@ class rmw_context_impl_s::Data final
rmw_zenoh_cpp::GuardCondition guard_condition_data_;
// Shutdown flag.
bool is_shutdown_;
// Close operation handle
std::optional<zc_owned_concurrent_close_handle_t> close_handle;
// A counter to assign a local id for every entity created in this session.
std::size_t next_entity_id_;
// Nodes created from this context.
Expand Down Expand Up @@ -501,6 +519,12 @@ rmw_ret_t rmw_context_impl_s::shutdown()
return data_->shutdown();
}

///=============================================================================
rmw_ret_t rmw_context_impl_s::wait_for_session_close()
{
return data_->wait_for_session_close();
}

///=============================================================================
bool rmw_context_impl_s::is_shutdown() const
{
Expand Down
5 changes: 4 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ class rmw_context_impl_s final
// Get a unique id for a new entity.
std::size_t get_next_entity_id();

// Shutdown the Zenoh session.
// Asynchronously shutdown the Zenoh session.
rmw_ret_t shutdown();

// Wait for Zenoh session shutdown to comlete.
rmw_ret_t wait_for_session_close();

// Check if the Zenoh session is shutdown.
bool is_shutdown() const;

Expand Down
4 changes: 2 additions & 2 deletions zenoh_c_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$<SEMICOLON>--features=shared-memor
# - https://github.com/eclipse-zenoh/zenoh/pull/1150 (fix deadlock issue https://github.com/ros2/rmw_zenoh/issues/182)
# - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers)
ament_vendor(zenoh_c_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git
VCS_VERSION 42e717ff7b633649f11ebb7800b71d4939cd21c7
VCS_URL https://github.com/ZettaScaleLabs/zenoh-c.git
VCS_VERSION 4cf9b92e0ad8ba494b935057ecc771a640e8b72d
CMAKE_ARGS
"-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}"
"-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"
Expand Down

0 comments on commit 3034213

Please sign in to comment.