From 93afc16bd56182b5e85d0e34bcdd5e28aeb276a5 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Fri, 29 Nov 2024 20:55:53 +0800 Subject: [PATCH 1/8] refactor: zc_liveliness_* -> z_liveliness_* and bump up zenoh-c version --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 4 ++-- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 2 +- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 11 ++++++----- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 8 ++++---- rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 4 ++-- rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp | 10 +++++----- rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp | 4 ++-- rmw_zenoh_cpp/src/detail/rmw_service_data.cpp | 4 ++-- rmw_zenoh_cpp/src/detail/rmw_service_data.hpp | 2 +- rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp | 4 ++-- rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp | 2 +- zenoh_c_vendor/CMakeLists.txt | 2 +- 12 files changed, 29 insertions(+), 28 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index ba73bdf3..d70ce9d5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -252,7 +252,7 @@ bool ClientData::init(const z_loaned_session_t * session) [this]() { z_drop(z_move(this->token_)); }); - if (zc_liveliness_declare_token( + if (z_liveliness_declare_token( session, &this->token_, z_loan(liveliness_ke), @@ -518,7 +518,7 @@ void ClientData::_shutdown() } // Unregister this node from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + z_liveliness_undeclare_token(z_move(token_)); z_drop(z_move(keyexpr_)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index b78c1744..b2cbbbdf 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -131,7 +131,7 @@ class ClientData final : public std::enable_shared_from_this // An owned keyexpression. z_owned_keyexpr_t keyexpr_; // Liveliness token for the service. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 7915cd9e..e2f1ca5f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -124,7 +124,7 @@ class rmw_context_impl_s::Data final // Query router/liveliness participants to get graph information before the session was started. // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // replies for the z_liveliness_get() call. This is necessary as if the `bound` // is too low, the channel may starve the zenoh executor of its threads which // would lead to deadlocks when trying to receive replies and block the // execution here. @@ -143,7 +143,7 @@ class rmw_context_impl_s::Data final z_view_keyexpr_t keyexpr; z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); - zc_liveliness_get( + z_liveliness_get( z_loan(session_), z_loan(keyexpr), z_move(closure), NULL); z_owned_reply_t reply; @@ -196,8 +196,8 @@ class rmw_context_impl_s::Data final // Setup the liveliness subscriber to receives updates from the ROS graph // and update the graph cache. - zc_liveliness_subscriber_options_t sub_options; - zc_liveliness_subscriber_options_default(&sub_options); + z_liveliness_subscriber_options_t sub_options; + z_liveliness_subscriber_options_default(&sub_options); z_owned_closure_sample_t callback; z_closure(&callback, graph_sub_data_handler, nullptr, this); z_view_keyexpr_t liveliness_ke; @@ -206,7 +206,7 @@ class rmw_context_impl_s::Data final [this]() { z_undeclare_subscriber(z_move(this->graph_subscriber_)); }); - if (zc_liveliness_declare_subscriber( + if (z_liveliness_declare_subscriber( z_loan(session_), &graph_subscriber_, z_loan(liveliness_ke), z_move(callback), &sub_options) != Z_OK) @@ -245,6 +245,7 @@ class rmw_context_impl_s::Data final RMW_SET_ERROR_MSG("Error while closing zenoh session"); return RMW_RET_ERROR; } + return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 96d95191..dda0717e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -60,12 +60,12 @@ std::shared_ptr NodeData::make( std::string liveliness_keyexpr = entity->liveliness_keyexpr(); z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - zc_owned_liveliness_token_t token; + z_owned_liveliness_token_t token; auto free_token = rcpputils::make_scope_exit( [&token]() { z_drop(z_move(token)); }); - if (zc_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { + if (z_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the node."); @@ -87,7 +87,7 @@ NodeData::NodeData( const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - zc_owned_liveliness_token_t token) + z_owned_liveliness_token_t token) : node_(node), id_(std::move(id)), entity_(std::move(entity)), @@ -402,7 +402,7 @@ rmw_ret_t NodeData::shutdown() } // Unregister this node from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + z_liveliness_undeclare_token(z_move(token_)); is_shutdown_ = true; return ret; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index e26dd166..ab17e096 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -128,7 +128,7 @@ class NodeData final const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - zc_owned_liveliness_token_t token); + z_owned_liveliness_token_t token); // Internal mutex. mutable std::recursive_mutex mutex_; // The rmw_node_t associated with this NodeData. @@ -139,7 +139,7 @@ class NodeData final // The Entity generated for the node. std::shared_ptr entity_; // Liveliness token for the node. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Shutdown flag. bool is_shutdown_; // Map of publishers. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index db096144..1e7c7b6f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -171,13 +171,13 @@ std::shared_ptr PublisherData::make( std::string liveliness_keyexpr = entity->liveliness_keyexpr(); z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - zc_owned_liveliness_token_t token; + z_owned_liveliness_token_t token; auto free_token = rcpputils::make_scope_exit( [&token]() { z_drop(z_move(token)); }); - if (zc_liveliness_declare_token( - session, &token, z_loan(liveliness_ke), + if (z_liveliness_declare_token( + z_loan(owned_session), &token, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -208,7 +208,7 @@ PublisherData::PublisherData( std::shared_ptr entity, z_owned_publisher_t pub, std::optional pub_cache, - zc_owned_liveliness_token_t token, + z_owned_liveliness_token_t token, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), @@ -431,7 +431,7 @@ rmw_ret_t PublisherData::shutdown() } // Unregister this publisher from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + z_liveliness_undeclare_token(z_move(token_)); if (pub_cache_.has_value()) { z_drop(z_move(pub_cache_.value())); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 4186f434..21bbe620 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -89,7 +89,7 @@ class PublisherData final std::shared_ptr entity, z_owned_publisher_t pub, std::optional pub_cache, - zc_owned_liveliness_token_t token, + z_owned_liveliness_token_t token, const void * type_support_impl, std::unique_ptr type_support); @@ -104,7 +104,7 @@ class PublisherData final // Optional publication cache when durability is transient_local. std::optional pub_cache_; // Liveliness token for the publisher. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 012ffe4a..f4c92d0a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -195,7 +195,7 @@ std::shared_ptr ServiceData::make( z_drop(z_move(service_data->token_)); } }); - if (zc_liveliness_declare_token( + if (z_liveliness_declare_token( session, &service_data->token_, z_loan(liveliness_ke), NULL) != Z_OK) { @@ -495,7 +495,7 @@ rmw_ret_t ServiceData::shutdown() } // Unregister this node from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + z_liveliness_undeclare_token(z_move(token_)); z_undeclare_queryable(z_move(qable_)); is_shutdown_ = true; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index f0676635..ba36e6e5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -112,7 +112,7 @@ class ServiceData final // An owned queryable. z_owned_queryable_t qable_; // Liveliness token for the service. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index b50debb7..1b485a64 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -324,7 +324,7 @@ bool SubscriptionData::init() [this]() { z_drop(z_move(token_)); }); - if (zc_liveliness_declare_token( + if (z_liveliness_declare_token( context_impl->session(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -393,7 +393,7 @@ rmw_ret_t SubscriptionData::shutdown() graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); // Unregister this subscription from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + z_liveliness_undeclare_token(z_move(token_)); z_owned_subscriber_t * sub = std::get_if(&sub_); if (sub != nullptr) { diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 6d17fbf4..3d1ccede 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -139,7 +139,7 @@ class SubscriptionData final : public std::enable_shared_from_this sub_; // Liveliness token for the subscription. - zc_owned_liveliness_token_t token_; + z_owned_liveliness_token_t token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 086ea5db..cddd2fd4 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -26,7 +26,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 4b759d4e4d35a97d7b20b5c4003b8b764a10f679 + VCS_VERSION 42e717ff7b633649f11ebb7800b71d4939cd21c7 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" From 17865d27f542813dfcae9bc6c45eade25be9820f Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Fri, 29 Nov 2024 22:39:14 +0800 Subject: [PATCH 2/8] refactor: reorder the cancel functions --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 13 +++------ .../src/detail/rmw_context_impl_s.cpp | 5 ---- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 5 ---- .../src/detail/rmw_publisher_data.cpp | 24 +++++++--------- rmw_zenoh_cpp/src/detail/rmw_service_data.cpp | 8 +++--- .../src/detail/rmw_subscription_data.cpp | 28 +++++++++---------- 6 files changed, 32 insertions(+), 51 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index d70ce9d5..1966a3d1 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -236,22 +236,18 @@ ClientData::ClientData( bool ClientData::init(const z_loaned_session_t * session) { std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_; - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(this->keyexpr_)); - }); if (z_keyexpr_from_str(&this->keyexpr_, topic_keyexpr.c_str()) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return false; } + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->keyexpr_)); + }); std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr(); z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - auto free_token = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(this->token_)); - }); if (z_liveliness_declare_token( session, &this->token_, @@ -266,7 +262,6 @@ bool ClientData::init(const z_loaned_session_t * session) } free_ros_keyexpr.cancel(); - free_token.cancel(); return true; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index e2f1ca5f..3c89983c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -202,10 +202,6 @@ class rmw_context_impl_s::Data final z_closure(&callback, graph_sub_data_handler, nullptr, this); z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str()); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); if (z_liveliness_declare_subscriber( z_loan(session_), &graph_subscriber_, z_loan(liveliness_ke), @@ -217,7 +213,6 @@ class rmw_context_impl_s::Data final close_session.cancel(); free_shm_provider.cancel(); - undeclare_z_sub.cancel(); } // Shutdown the Zenoh session. diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index dda0717e..8a8e0440 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -61,17 +61,12 @@ std::shared_ptr NodeData::make( z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); z_owned_liveliness_token_t token; - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); if (z_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the node."); return nullptr; } - free_token.cancel(); return std::shared_ptr( new NodeData{ diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 1e7c7b6f..6a69fd5c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -111,12 +111,6 @@ std::shared_ptr PublisherData::make( // Create a Publication Cache if durability is transient_local. std::optional pub_cache = std::nullopt; - auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( - [&pub_cache]() { - if (pub_cache.has_value()) { - z_drop(z_move(pub_cache.value())); - } - }); if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_publication_cache_options_t pub_cache_opts; ze_publication_cache_options_default(&pub_cache_opts); @@ -142,6 +136,12 @@ std::shared_ptr PublisherData::make( } pub_cache = pub_cache_; } + auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( + [&pub_cache]() { + if (pub_cache.has_value()) { + z_drop(z_move(pub_cache.value())); + } + }); // Set congestion_control to BLOCK if appropriate. z_publisher_options_t opts; @@ -157,25 +157,21 @@ std::shared_ptr PublisherData::make( } z_owned_publisher_t pub; // TODO(clalancette): What happens if the key name is a valid but empty string? - auto undeclare_z_publisher = rcpputils::make_scope_exit( - [&pub]() { - z_undeclare_publisher(z_move(pub)); - }); if (z_declare_publisher( session, &pub, z_loan(pub_ke), &opts) != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; } + auto undeclare_z_publisher = rcpputils::make_scope_exit( + [&pub]() { + z_undeclare_publisher(z_move(pub)); + }); std::string liveliness_keyexpr = entity->liveliness_keyexpr(); z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); z_owned_liveliness_token_t token; - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); if (z_liveliness_declare_token( z_loan(owned_session), &token, z_loan(liveliness_ke), NULL) != Z_OK) diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index f4c92d0a..a31cdf2b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -171,10 +171,6 @@ std::shared_ptr ServiceData::make( z_queryable_options_t qable_options; z_queryable_options_default(&qable_options); qable_options.complete = true; - auto undeclare_z_queryable = rcpputils::make_scope_exit( - [service_data]() { - z_undeclare_queryable(z_move(service_data->qable_)); - }); if (z_declare_queryable( session, &service_data->qable_, z_loan(service_ke), z_move(callback), &qable_options) != Z_OK) @@ -182,6 +178,10 @@ std::shared_ptr ServiceData::make( RMW_SET_ERROR_MSG("unable to create zenoh queryable"); return nullptr; } + auto undeclare_z_queryable = rcpputils::make_scope_exit( + [service_data]() { + z_undeclare_queryable(z_move(service_data->qable_)); + }); std::string liveliness_keyexpr = service_data->entity_->liveliness_keyexpr(); z_view_keyexpr_t liveliness_ke; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 1b485a64..41e9bf2d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -212,20 +212,6 @@ bool SubscriptionData::init() rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub == nullptr || z_undeclare_subscriber(z_move(*sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_); - if (querying_sub == nullptr || ze_undeclare_querying_subscriber(z_move(*querying_sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } - } - }); - // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub @@ -315,6 +301,20 @@ bool SubscriptionData::init() sub_ = sub; } + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_owned_subscriber_t * sub = std::get_if(&sub_); + if (sub == nullptr || z_undeclare_subscriber(z_move(*sub))) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + } else { + ze_owned_querying_subscriber_t * querying_sub = + std::get_if(&sub_); + if (querying_sub == nullptr || ze_undeclare_querying_subscriber(z_move(*querying_sub))) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + } + } + }); + // Publish to the graph that a new subscription is in town. std::string liveliness_keyexpr = entity_->liveliness_keyexpr(); z_view_keyexpr_t liveliness_ke; From 0255c37749e3588d56670cbaeb013f049132cf6a Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Wed, 4 Dec 2024 11:52:12 +0800 Subject: [PATCH 3/8] chore: reorder some lines of code --- rmw_zenoh_cpp/src/detail/attachment_helpers.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 35019ca6..4fc96805 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -62,9 +62,8 @@ attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment) ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment); z_owned_string_t key; - ze_deserializer_deserialize_string(&deserializer, &key); - // Deserialize the sequence_number + ze_deserializer_deserialize_string(&deserializer, &key); if (std::string_view( z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "sequence_number") From 09b45c326696a272ce7d95336a7ea8e00284ff8c Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Thu, 5 Dec 2024 00:30:54 +0800 Subject: [PATCH 4/8] refactor: add `session_is_valid` check --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 885baaf6..0770ca00 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -392,6 +392,10 @@ rmw_create_publisher( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (!context_impl->session_is_valid()) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { @@ -917,6 +921,10 @@ rmw_create_subscription( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (!context_impl->session_is_valid()) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { @@ -1385,6 +1393,10 @@ rmw_create_client( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (!context_impl->session_is_valid()) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } // Get the service type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); @@ -1629,6 +1641,10 @@ rmw_create_service( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (!context_impl->session_is_valid()) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } // Get the RMW type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); From 0e1cca167c79e1e0ba9e7eb93f3d59055ee5a0c1 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Thu, 5 Dec 2024 02:32:08 +0800 Subject: [PATCH 5/8] fixup! refactor: reorder the cancel functions --- rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 6a69fd5c..cbc7e083 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -182,7 +182,6 @@ std::shared_ptr PublisherData::make( return nullptr; } - free_token.cancel(); undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); From 09ecde7675af74a2756f455f7d1673180fe7074b Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Thu, 5 Dec 2024 15:05:01 +0800 Subject: [PATCH 6/8] fixup! refactor: zc_liveliness_* -> z_liveliness_* and bump up zenoh-c version --- rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index cbc7e083..622de2f9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -173,7 +173,7 @@ std::shared_ptr PublisherData::make( z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); z_owned_liveliness_token_t token; if (z_liveliness_declare_token( - z_loan(owned_session), &token, z_loan(liveliness_ke), + session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( From 8f4949c989c7b283151c983da350609b23acea2e Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Thu, 5 Dec 2024 15:07:41 +0800 Subject: [PATCH 7/8] fixup! fix: address the comments --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 3c89983c..ed3bd9a9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -152,10 +152,10 @@ class rmw_context_impl_s::Data final const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply)); z_view_string_t keystr; z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - std::string livelines_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); + std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); // Ignore tokens from the same session to avoid race conditions from this // query and the liveliness subscription. - graph_cache_->parse_put(std::move(livelines_str), true); + graph_cache_->parse_put(std::move(liveliness_str), true); } else { RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); @@ -439,8 +439,8 @@ static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data) } // Update the graph cache. - std::string livelines_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); - data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(livelines_str)); + std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); + data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(liveliness_str)); } ///============================================================================= From 84d1267f1d5aa0e7d94b7719652aa9dd90f8008f Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Thu, 5 Dec 2024 16:12:06 +0800 Subject: [PATCH 8/8] chore: address the comment --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 0770ca00..bab759a5 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -750,6 +750,10 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + if (pub_data->is_shutdown()) { + return RMW_RET_ERROR; + } + return RMW_RET_OK; }