diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index a95c98a2..c3042007 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -105,7 +105,7 @@ namespace rmw_zenoh_cpp { ///============================================================================= std::shared_ptr ClientData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -167,7 +167,7 @@ std::shared_ptr ClientData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Client, @@ -192,6 +192,7 @@ std::shared_ptr ClientData::make( node, client, entity, + session, request_members, response_members, request_type_support, @@ -211,6 +212,7 @@ ClientData::ClientData( const rmw_node_t * rmw_node, const rmw_client_t * rmw_client, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, @@ -218,6 +220,7 @@ ClientData::ClientData( : rmw_node_(rmw_node), rmw_client_(rmw_client), entity_(std::move(entity)), + sess_(std::move(sess)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), request_type_support_(request_type_support), @@ -232,7 +235,7 @@ ClientData::ClientData( } ///============================================================================= -bool ClientData::init(const z_loaned_session_t * session) +bool ClientData::init(std::shared_ptr session) { if (z_keyexpr_from_str( &this->keyexpr_, @@ -250,7 +253,7 @@ bool ClientData::init(const z_loaned_session_t * session) z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); if (z_liveliness_declare_token( - session, + session->loan(), &this->token_, z_loan(liveliness_ke), NULL @@ -406,6 +409,7 @@ rmw_ret_t ClientData::send_request( if (context_impl == nullptr) { return RMW_RET_INVALID_ARGUMENT; } + sess_ = context_impl->session(); size_t max_data_length = ( request_type_support_->get_estimated_serialized_size( @@ -470,7 +474,7 @@ rmw_ret_t ClientData::send_request( z_owned_closure_reply_t zn_closure_reply; z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this); z_get( - context_impl->session(), + sess_->loan(), z_loan(keyexpr_), "", z_move(zn_closure_reply), &opts); @@ -535,6 +539,7 @@ void ClientData::_shutdown() z_drop(z_move(keyexpr_)); } + sess_.reset(); is_shutdown_ = true; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index bc737435..349a26db 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -47,7 +47,7 @@ class ClientData final : public std::enable_shared_from_this public: // Make a shared_ptr of ClientData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -113,13 +113,14 @@ class ClientData final : public std::enable_shared_from_this const rmw_node_t * rmw_node, const rmw_client_t * client, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, std::shared_ptr response_type_support); // Initialize the Zenoh objects for this entity. - bool init(const z_loaned_session_t * session); + bool init(std::shared_ptr session); // Shutdown this client (the mutex is expected to be held by the caller). void _shutdown(); @@ -131,6 +132,8 @@ class ClientData final : public std::enable_shared_from_this const rmw_client_t * rmw_client_; // The Entity generated for the service. std::shared_ptr entity_; + // A shared session. + std::shared_ptr sess_; // An owned keyexpression. z_owned_keyexpr_t keyexpr_; // Liveliness token for the service. diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 9458eb45..a2c98b93 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -36,6 +36,7 @@ #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" +#include "zenoh_utils.hpp" // Megabytes of SHM to reserve. // TODO(clalancette): Make this configurable, or get it from the configuration @@ -86,13 +87,18 @@ class rmw_context_impl_s::Data final }); // Initialize the zenoh session. - if (z_open(&session_, z_move(config), NULL) != Z_OK) { + z_owned_session_t raw_session; + if (z_open(&raw_session, z_move(config), NULL) != Z_OK) { RMW_SET_ERROR_MSG("Error setting up zenoh session."); throw std::runtime_error("Error setting up zenoh session."); } + if (session_ != nullptr) { + session_.reset(); + } + session_ = std::make_shared(raw_session); auto close_session = rcpputils::make_scope_exit( - [this]() { - z_close(z_loan_mut(session_), NULL); + [&raw_session]() { + z_close(z_loan_mut(raw_session), NULL); }); // Verify if the zenoh router is running if configured. @@ -102,7 +108,7 @@ class rmw_context_impl_s::Data final uint64_t connection_attempts = 0; constexpr std::chrono::milliseconds sleep_time(1000); constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time); - while ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) { + while ((ret = rmw_zenoh_cpp::zenoh_router_check(session_->loan())) != RMW_RET_OK) { if ((connection_attempts % ticks_between_print) == 0) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", @@ -117,7 +123,7 @@ class rmw_context_impl_s::Data final } // Initialize the graph cache. - const z_id_t zid = z_info_zid(z_loan(session_)); + const z_id_t zid = z_info_zid(session_->loan()); graph_cache_ = std::make_shared(zid); // Setup liveliness subscriptions for discovery. std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); @@ -144,7 +150,7 @@ class rmw_context_impl_s::Data final z_view_keyexpr_t keyexpr; z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); z_liveliness_get( - z_loan(session_), z_loan(keyexpr), + session_->loan(), z_loan(keyexpr), z_move(closure), NULL); z_owned_reply_t reply; while (z_recv(z_loan(handler), &reply) == Z_OK) { @@ -203,7 +209,7 @@ class rmw_context_impl_s::Data final z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str()); if (z_liveliness_declare_subscriber( - z_loan(session_), + session_->loan(), &graph_subscriber_, z_loan(liveliness_ke), z_move(callback), &sub_options) != Z_OK) { @@ -240,11 +246,8 @@ class rmw_context_impl_s::Data final // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). } - // Close the zenoh session - if (z_close(z_loan_mut(session_), NULL) != Z_OK) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } + // Drop the shared session. + session_.reset(); return RMW_RET_OK; } @@ -255,10 +258,10 @@ class rmw_context_impl_s::Data final return enclave_; } - const z_loaned_session_t * session() const + std::shared_ptr session() const { std::lock_guard lock(mutex_); - return z_loan(session_); + return session_; } std::optional & shm_provider() @@ -288,7 +291,7 @@ class rmw_context_impl_s::Data final bool session_is_valid() const { std::lock_guard lock(mutex_); - return !z_session_is_closed(z_loan(session_)); + return !z_session_is_closed(session_->loan()); } std::shared_ptr graph_cache() @@ -319,7 +322,7 @@ class rmw_context_impl_s::Data final auto node_data = rmw_zenoh_cpp::NodeData::make( node, this->get_next_entity_id(), - z_loan(session_), + session_->loan(), domain_id_, ns, node_name, @@ -395,8 +398,8 @@ class rmw_context_impl_s::Data final std::size_t domain_id_; // Enclave, name used to find security artifacts in a sros2 keystore. std::string enclave_; - // An owned session. - z_owned_session_t session_; + // A shared session. + std::shared_ptr session_{nullptr}; // An optional SHM manager that is initialized of SHM is enabled in the // zenoh session config. std::optional shm_provider_; @@ -472,7 +475,7 @@ std::string rmw_context_impl_s::enclave() const } ///============================================================================= -const z_loaned_session_t * rmw_context_impl_s::session() const +std::shared_ptr rmw_context_impl_s::session() const { return data_->session(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 1e46d6af..ea66e0c3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -47,8 +47,8 @@ class rmw_context_impl_s final // Get a copy of the enclave. std::string enclave() const; - // Loan the Zenoh session. - const z_loaned_session_t * session() const; + // Share the Zenoh session. + std::shared_ptr session() const; // Get a reference to the shm_provider. // Note: This is not thread-safe. diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index cb4817a5..1255de21 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -121,7 +121,7 @@ std::size_t NodeData::id() const ///============================================================================= bool NodeData::create_pub_data( const rmw_publisher_t * const publisher, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -187,7 +187,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= bool NodeData::create_sub_data( const rmw_subscription_t * const subscription, - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -255,7 +255,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) ///============================================================================= bool NodeData::create_service_data( const rmw_service_t * const service, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -322,7 +322,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service) ///============================================================================= bool NodeData::create_client_data( const rmw_client_t * const client, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index ab17e096..4429b589 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -55,7 +55,7 @@ class NodeData final // Create a new PublisherData for a given rmw_publisher_t. bool create_pub_data( const rmw_publisher_t * const publisher, - const z_loaned_session_t * session, + std::shared_ptr sess, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -70,7 +70,7 @@ class NodeData final // Create a new SubscriptionData for a given rmw_subscription_t. bool create_sub_data( const rmw_subscription_t * const subscription, - const z_loaned_session_t * session, + std::shared_ptr sess, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -86,7 +86,7 @@ class NodeData final // Create a new ServiceData for a given rmw_service_t. bool create_service_data( const rmw_service_t * const service, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -101,7 +101,7 @@ class NodeData final // Create a new ClientData for a given rmw_client_t. bool create_client_data( const rmw_client_t * const client, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index d1449b63..32f1dbc1 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -15,6 +15,7 @@ #include "rmw_publisher_data.hpp" #include +#include #include #include @@ -42,7 +43,7 @@ namespace rmw_zenoh_cpp ///============================================================================= std::shared_ptr PublisherData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -82,7 +83,7 @@ std::shared_ptr PublisherData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(publisher_id), liveliness::EntityType::Publisher, @@ -129,7 +130,7 @@ std::shared_ptr PublisherData::make( ze_owned_publication_cache_t pub_cache_; if (ze_declare_publication_cache( - session, &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) + session->loan(), &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; @@ -158,7 +159,7 @@ std::shared_ptr PublisherData::make( z_owned_publisher_t pub; // TODO(clalancette): What happens if the key name is a valid but empty string? if (z_declare_publisher( - session, &pub, z_loan(pub_ke), &opts) != Z_OK) + session->loan(), &pub, z_loan(pub_ke), &opts) != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; @@ -173,7 +174,7 @@ std::shared_ptr PublisherData::make( z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); z_owned_liveliness_token_t token; if (z_liveliness_declare_token( - session, &token, z_loan(liveliness_ke), + session->loan(), &token, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -194,6 +195,7 @@ std::shared_ptr PublisherData::make( new PublisherData{ node, std::move(entity), + std::move(session), std::move(pub), std::move(pub_cache), std::move(token), @@ -206,6 +208,7 @@ std::shared_ptr PublisherData::make( PublisherData::PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, z_owned_publisher_t pub, std::optional pub_cache, z_owned_liveliness_token_t token, @@ -213,6 +216,7 @@ PublisherData::PublisherData( std::unique_ptr type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), + sess_(std::move(sess)), pub_(std::move(pub)), pub_cache_(std::move(pub_cache)), token_(std::move(token)), @@ -438,11 +442,16 @@ rmw_ret_t PublisherData::shutdown() // Unregister this publisher from the ROS graph. z_liveliness_undeclare_token(z_move(token_)); + // if (pub_cache_.has_value() && !z_session_is_closed(z_loan(sess_))) { if (pub_cache_.has_value()) { - z_drop(z_move(pub_cache_.value())); + ze_undeclare_publication_cache(z_move(pub_cache_.value())); + // if (z_session_is_closed(z_loan(sess_))) { + // z_drop(z_move(pub_cache_.value())); + // } } z_undeclare_publisher(z_move(pub_)); + sess_.reset(); is_shutdown_ = true; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index bde0c13b..1853ff8a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -28,6 +28,7 @@ #include "liveliness_utils.hpp" #include "message_type_support.hpp" #include "type_support_common.hpp" +#include "zenoh_utils.hpp" #include "rcutils/allocator.h" @@ -42,7 +43,7 @@ class PublisherData final public: // Make a shared_ptr of PublisherData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -90,6 +91,7 @@ class PublisherData final PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, z_owned_publisher_t pub, std::optional pub_cache, z_owned_liveliness_token_t token, @@ -102,6 +104,8 @@ class PublisherData final const rmw_node_t * rmw_node_; // The Entity generated for the publisher. std::shared_ptr entity_; + // A shared session. + std::shared_ptr sess_; // An owned publisher. z_owned_publisher_t pub_; // Optional publication cache when durability is transient_local. diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index c7be982c..3d456a02 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -65,7 +65,7 @@ void service_data_handler(z_loaned_query_t * query, void * data) ///============================================================================= std::shared_ptr ServiceData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -126,7 +126,7 @@ std::shared_ptr ServiceData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Service, @@ -150,6 +150,7 @@ std::shared_ptr ServiceData::make( new ServiceData{ node, std::move(entity), + session, request_members, response_members, std::move(request_type_support), @@ -180,7 +181,7 @@ std::shared_ptr ServiceData::make( z_queryable_options_default(&qable_options); qable_options.complete = true; if (z_declare_queryable( - session, &service_data->qable_, z_loan(service_data->keyexpr_), + session->loan(), &service_data->qable_, z_loan(service_data->keyexpr_), z_move(callback), &qable_options) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); @@ -206,7 +207,7 @@ std::shared_ptr ServiceData::make( } }); if (z_liveliness_declare_token( - session, &service_data->token_, z_loan(liveliness_ke), + session->loan(), &service_data->token_, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -228,12 +229,14 @@ std::shared_ptr ServiceData::make( ServiceData::ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, std::unique_ptr response_type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), + sess_(std::move(sess)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), request_type_support_(std::move(request_type_support)), @@ -526,6 +529,7 @@ rmw_ret_t ServiceData::shutdown() z_undeclare_queryable(z_move(qable_)); } + sess_.reset(); is_shutdown_ = true; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index 75467494..a8bcbd66 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -47,7 +47,7 @@ class ServiceData final public: // Make a shared_ptr of ServiceData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -99,6 +99,7 @@ class ServiceData final ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, @@ -112,6 +113,8 @@ class ServiceData final std::shared_ptr entity_; // An owned keyexpression. z_owned_keyexpr_t keyexpr_; + // A shared session. + std::shared_ptr sess_; // An owned queryable. z_owned_queryable_t qable_; // Liveliness token for the service. diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index f0036606..4f21300b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -94,7 +94,7 @@ SubscriptionData::Message::~Message() ///============================================================================= std::shared_ptr SubscriptionData::make( - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -136,7 +136,7 @@ std::shared_ptr SubscriptionData::make( // with Zenoh; after this, callbacks may come in at any time. std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(subscription_id), liveliness::EntityType::Subscription, @@ -161,6 +161,7 @@ std::shared_ptr SubscriptionData::make( node, graph_cache, std::move(entity), + std::move(session), type_support->data, std::move(message_type_support) }); @@ -178,11 +179,13 @@ SubscriptionData::SubscriptionData( const rmw_node_t * rmw_node, std::shared_ptr graph_cache, std::shared_ptr entity, + std::shared_ptr sess, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), entity_(std::move(entity)), + sess_(std::move(sess)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), last_known_published_msg_({}), @@ -212,6 +215,8 @@ bool SubscriptionData::init() rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); + sess_ = context_impl->session(); + // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub @@ -238,7 +243,7 @@ bool SubscriptionData::init() sub_options.query_consolidation = z_query_consolidation_none(); ze_owned_querying_subscriber_t sub; if (ze_declare_querying_subscriber( - context_impl->session(), &sub, z_loan(sub_ke), z_move(callback), &sub_options)) + sess_->loan(), &sub, z_loan(sub_ke), z_move(callback), &sub_options)) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; @@ -292,7 +297,7 @@ bool SubscriptionData::init() z_owned_subscriber_t sub; if (z_declare_subscriber( - context_impl->session(), &sub, z_loan(sub_ke), z_move(callback), + sess_->loan(), &sub, z_loan(sub_ke), z_move(callback), &sub_options) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); @@ -325,7 +330,7 @@ bool SubscriptionData::init() z_drop(z_move(token_)); }); if (z_liveliness_declare_token( - context_impl->session(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) + sess_->loan(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -422,6 +427,7 @@ rmw_ret_t SubscriptionData::shutdown() } } + sess_.reset(); is_shutdown_ = true; initialized_ = false; return ret; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 84d9a488..08a09def 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -34,6 +34,7 @@ #include "message_type_support.hpp" #include "attachment_helpers.hpp" #include "type_support_common.hpp" +#include "zenoh_utils.hpp" #include "rcutils/allocator.h" @@ -62,7 +63,7 @@ class SubscriptionData final : public std::enable_shared_from_this make( - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -126,6 +127,7 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache, std::shared_ptr entity, + std::shared_ptr sess, const void * type_support_impl, std::unique_ptr type_support); @@ -139,6 +141,8 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache_; // The Entity generated for the subscription. std::shared_ptr entity_; + // A shared session + std::shared_ptr sess_; // An owned subscriber or querying_subscriber depending on the QoS settings. std::variant sub_; // Liveliness token for the subscription. diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 77bb4dc2..398587c9 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -18,12 +18,27 @@ #include #include "attachment_helpers.hpp" +#include "logging_macros.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" namespace rmw_zenoh_cpp { +/// Close the zenoh session if destructed. +///============================================================================= +const z_loaned_session_t * ZenohSession::loan() +{ + return z_loan(inner_); +} + +/// Close the zenoh session if destructed. +///============================================================================= +ZenohSession::~ZenohSession() +{ + z_close(z_loan_mut(inner_), NULL); +} + ///============================================================================= void create_map_and_set_sequence_num( z_owned_bytes_t * out_bytes, int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 5f569082..ae2aed3a 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -25,6 +25,21 @@ namespace rmw_zenoh_cpp { + +/// A wrapped zenoh session with customized destruction. +///============================================================================= +class ZenohSession final +{ +public: + ZenohSession(z_owned_session_t sess) + : inner_(sess) {} + const z_loaned_session_t * loan(); + ~ZenohSession(); + +private: + z_owned_session_t inner_; +}; + ///============================================================================= void create_map_and_set_sequence_num(