diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 6e1468d7..d4b70440 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -76,7 +76,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) z_owned_reply_t owned_reply; z_reply_clone(&owned_reply, reply); client_data->add_new_reply( - std::make_unique(owned_reply, received_timestamp)); + std::make_unique(reply, received_timestamp)); } ///============================================================================= @@ -447,12 +447,13 @@ rmw_ret_t ClientData::send_request( // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. - z_owned_closure_reply_t callback; - z_closure(&callback, client_data_handler, client_data_drop, this); + num_in_flight_++; + z_owned_closure_reply_t zn_closure_reply; + z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this); z_get( context_impl->session(), z_loan(keyexpr_), "", - z_move(callback), + z_move(zn_closure_reply), &opts); return RMW_RET_OK; @@ -536,7 +537,7 @@ bool ClientData::shutdown_and_query_in_flight() ///============================================================================= void ClientData::decrement_in_flight_and_conditionally_remove() { - std::lock_guard lock(mutex_); + std::unique_lock lock(mutex_); --num_in_flight_; if (is_shutdown_ && num_in_flight_ == 0) { @@ -548,6 +549,8 @@ void ClientData::decrement_in_flight_and_conditionally_remove() if (node_data == nullptr) { return; } + // We have to unlock here since we are about to delete ourself, and thus the unlock would be UB. + lock.unlock(); node_data->delete_client_data(rmw_client_); } } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 2cff857c..94dcd070 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -246,19 +246,6 @@ class rmw_context_impl_s::Data final return ret; } - // Shutdown all the nodes in this context. - for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { - ret = node_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", - node_it->second->id(), - ret - ); - } - } - z_undeclare_subscriber(z_move(graph_subscriber_)); if (shm_provider_.has_value()) { z_drop(z_move(shm_provider_.value())); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 2f7187b6..96d95191 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -114,7 +114,7 @@ NodeData::~NodeData() ///============================================================================= std::size_t NodeData::id() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return id_; } @@ -127,7 +127,7 @@ bool NodeData::create_pub_data( const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -168,7 +168,7 @@ bool NodeData::create_pub_data( ///============================================================================= PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = pubs_.find(publisher); if (it == pubs_.end()) { return nullptr; @@ -180,7 +180,7 @@ PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); pubs_.erase(publisher); } @@ -194,7 +194,7 @@ bool NodeData::create_sub_data( const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -236,7 +236,7 @@ bool NodeData::create_sub_data( ///============================================================================= SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subscription) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = subs_.find(subscription); if (it == subs_.end()) { return nullptr; @@ -248,7 +248,7 @@ SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subs ///============================================================================= void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); subs_.erase(subscription); } @@ -261,7 +261,7 @@ bool NodeData::create_service_data( const rosidl_service_type_support_t * type_supports, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -302,7 +302,7 @@ bool NodeData::create_service_data( ///============================================================================= ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = services_.find(service); if (it == services_.end()) { return nullptr; @@ -314,7 +314,7 @@ ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service) ///============================================================================= void NodeData::delete_service_data(const rmw_service_t * const service) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); services_.erase(service); } @@ -328,7 +328,7 @@ bool NodeData::create_client_data( const rosidl_service_type_support_t * type_supports, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -370,7 +370,7 @@ bool NodeData::create_client_data( ///============================================================================= ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = clients_.find(client); if (it == clients_.end()) { return nullptr; @@ -382,7 +382,7 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) ///============================================================================= void NodeData::delete_client_data(const rmw_client_t * const client) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto client_it = clients_.find(client); if (client_it == clients_.end()) { return; @@ -395,62 +395,12 @@ void NodeData::delete_client_data(const rmw_client_t * const client) ///============================================================================= rmw_ret_t NodeData::shutdown() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); rmw_ret_t ret = RMW_RET_OK; if (is_shutdown_) { return ret; } - // Shutdown all the entities within this node. - for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) { - ret = pub_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown publisher %s within id %zu. rmw_ret_t code: %zu.", - pub_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } - } - for (auto sub_it = subs_.begin(); sub_it != subs_.end(); ++sub_it) { - ret = sub_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown subscription %s within id %zu. rmw_ret_t code: %zu.", - sub_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } - } - for (auto srv_it = services_.begin(); srv_it != services_.end(); ++srv_it) { - ret = srv_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown service %s within id %zu. rmw_ret_t code: %zu.", - srv_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } - } - for (auto cli_it = clients_.begin(); cli_it != clients_.end(); ++cli_it) { - ret = cli_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown client %s within id %zu. rmw_ret_t code: %zu.", - cli_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } - } - // Unregister this node from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); @@ -462,7 +412,7 @@ rmw_ret_t NodeData::shutdown() // Check if the Node is shutdown. bool NodeData::is_shutdown() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return is_shutdown_; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index 0ad3bb6e..917f45e2 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -128,7 +128,7 @@ class NodeData final std::shared_ptr entity, zc_owned_liveliness_token_t token); // Internal mutex. - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // The rmw_node_t associated with this NodeData. const rmw_node_t * node_; // The entity id of this node as generated by get_next_entity_id(). diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 55f91ca2..b51b3973 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -52,9 +52,10 @@ void service_data_handler(z_loaned_query_t * query, void * data) return; } - z_owned_query_t owned_query; - z_query_clone(&owned_query, query); - service_data->add_new_query(std::make_unique(owned_query)); + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + service_data->add_new_query(std::make_unique(query, received_timestamp)); } ///============================================================================= @@ -322,10 +323,7 @@ rmw_ret_t ServiceData::take_request( RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; } - - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - request_header->received_timestamp = now_ns.count(); + request_header->received_timestamp = query->get_received_timestamp(); // Add this query to the map, so that rmw_send_response can quickly look it up later. const size_t hash = rmw_zenoh_cpp::hash_gid(request_header->request_id.writer_guid); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 80d87c4c..74cf1668 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -34,7 +34,16 @@ void create_map_and_set_sequence_num( } ///============================================================================= -ZenohQuery::ZenohQuery(z_owned_query_t query) {query_ = query;} +ZenohQuery::ZenohQuery(const z_loaned_query_t * query, std::chrono::nanoseconds::rep received_timestamp) { + z_query_clone(&query_, query); + received_timestamp_ = received_timestamp; +} + +///============================================================================= +std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const +{ + return received_timestamp_; +} ///============================================================================= ZenohQuery::~ZenohQuery() {z_drop(z_move(query_));} @@ -44,10 +53,10 @@ const z_loaned_query_t * ZenohQuery::get_query() const {return z_loan(query_);} ///============================================================================= ZenohReply::ZenohReply( - z_owned_reply_t reply, + const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp) { - reply_ = reply; + z_reply_clone(&reply_, reply); received_timestamp_ = received_timestamp; } diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 97e1446f..569d1ab1 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -35,7 +35,7 @@ create_map_and_set_sequence_num( class ZenohReply final { public: - ZenohReply(z_owned_reply_t reply, std::chrono::nanoseconds::rep received_timestamp); + ZenohReply(const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp); ~ZenohReply(); @@ -53,14 +53,17 @@ class ZenohReply final class ZenohQuery final { public: - ZenohQuery(z_owned_query_t query); + ZenohQuery(const z_loaned_query_t * query, std::chrono::nanoseconds::rep received_timestamp); ~ZenohQuery(); const z_loaned_query_t * get_query() const; + std::chrono::nanoseconds::rep get_received_timestamp() const; + private: z_owned_query_t query_; + std::chrono::nanoseconds::rep received_timestamp_; }; } // namespace rmw_zenoh_cpp