From 821f4e9f9fbd6226a83b8cd6c93a9936c47d5639 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 12 Nov 2024 13:42:50 +0000 Subject: [PATCH] Vastly simplify handling of in-flight ClientData. We basically use the pointer as a key to lookup the shared_ptr, and then always use the shared_ptr. That ensures that even if the pointer was dropped from the node while the client callback is operating, it will still be valid and won't be destroyed until that method returns. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 167 ++++--------------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 5 - rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 9 +- 3 files changed, 39 insertions(+), 142 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 974458ca..8c851a38 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -41,38 +40,17 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" -namespace rmw_zenoh_cpp +namespace { -// rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no -// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an -// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the -// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. -// The next 3 global variables are used to avoid that situation. Any time a query is initiated via -// rmw_send_request(), num_in_flight_ is incremented. When Zenoh calls the callback for the -// query drop, num_in_flight_map->second is decremented. -// When ClientData is destroyed, it checks to see if there are things in flight. If there are, -// it leaves this ClientData pointer both in the num_in_flight_map and the deleted_clients map. -// When the client_data_handler() is called on these destroyed objects, it knows that it cannot -// dereference the data anymore, and it gets out early. When client_data_drop() is called, it -// decrements num_in_flight_map->second, and if that drops to zero, drops the pointer address -// completely from deleted_clients. -// -// There is one case which is not handled by this, which has to do with timeouts. The query -// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never -// returns, the memory in this structure will never be freed. There isn't much we can do about -// that at this time, but we may want to consider changing the timeout so that the memory can -// eventually be freed up. -// -// TODO(Yadunund): Remove these variables once we switch to zenoh-cpp and can capture -// weak_ptr in zenoh callbacks. -static std::mutex num_in_flight_mutex; -static std::unordered_map num_in_flight_map = {}; -static std::unordered_set deleted_clients = {}; + +std::mutex client_data_ptr_to_shared_ptr_map_mutex; +std::unordered_map> client_data_ptr_to_shared_ptr_map; ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * data) { - auto client_data = static_cast(data); + auto client_data = static_cast(data); if (client_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -81,18 +59,16 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } - std::lock_guard lock(num_in_flight_mutex); - if (deleted_clients.count(client_data) > 0) { - RMW_ZENOH_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "client_data_handler triggered for ClientData that has been deleted. Ignoring..." - ); - return; + std::shared_ptr client_data_shared_ptr{nullptr}; + { + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + if (client_data_ptr_to_shared_ptr_map.count(client_data) == 0) { + return; + } + client_data_shared_ptr = client_data_ptr_to_shared_ptr_map[client_data]; } - // See the comment about the "num_in_flight" class variable in the ClientData class for - // why we need to do this. - if (client_data->is_shutdown()) { + if (client_data_shared_ptr->is_shutdown()) { return; } @@ -118,43 +94,17 @@ void client_data_handler(z_owned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - client_data->add_new_reply(std::make_unique(reply, received_timestamp)); + client_data_shared_ptr->add_new_reply( + std::make_unique(reply, received_timestamp)); // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } -///============================================================================= -void client_data_drop(void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the ClientData class for - // why we need to do this. - std::lock_guard lock(num_in_flight_mutex); - auto num_in_flight_it = num_in_flight_map.find(client_data); - if (num_in_flight_it == num_in_flight_map.end()) { - // This should never happen - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to find object in num_in_flight_map. Report this bug." - ); - return; - } - - --num_in_flight_it->second; - if (num_in_flight_it->second == 0 && deleted_clients.count(client_data) > 0) { - deleted_clients.erase(client_data); - } -} +} // namespace +namespace rmw_zenoh_cpp +{ ///============================================================================= std::shared_ptr ClientData::make( z_session_t session, @@ -238,28 +188,23 @@ std::shared_ptr ClientData::make( return nullptr; } - std::lock_guard lock(num_in_flight_mutex); - std::vector> duplicate_pointers; - std::shared_ptr client_data; - do { - client_data = std::shared_ptr( - new ClientData{ - node, - entity, - request_members, - response_members, - request_type_support, - response_type_support - }); - duplicate_pointers.push_back(client_data); - } while (deleted_clients.count(client_data.get()) > 0); + std::shared_ptr client_data = std::shared_ptr( + new ClientData{ + node, + entity, + request_members, + response_members, + request_type_support, + response_type_support + }); if (!client_data->init(session)) { // init() already set the error. return nullptr; } - num_in_flight_map[client_data.get()] = 0; + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + client_data_ptr_to_shared_ptr_map.emplace(client_data.get(), client_data); return client_data; } @@ -508,18 +453,6 @@ rmw_ret_t ClientData::send_request( z_bytes_map_drop(z_move(map)); }); - // See the comment about the "num_in_flight" class variable in the ClientData class for - // why we need to do this. - { - std::lock_guard lock(num_in_flight_mutex); - auto num_in_flight_it = num_in_flight_map.find(this); - if (num_in_flight_it == num_in_flight_map.end()) { - // This should never happen - RMW_SET_ERROR_MSG("failed to find object in num_in_flight_map"); - return RMW_RET_ERROR; - } - num_in_flight_it->second++; - } opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // The default timeout for a z_get query is 10 seconds and if a response is not received within @@ -535,7 +468,7 @@ rmw_ret_t ClientData::send_request( // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. z_owned_closure_reply_t zn_closure_reply = - z_closure(client_data_handler, client_data_drop, this); + z_closure(client_data_handler, nullptr, this); z_get( context_impl->session(), z_loan(keyexpr_), "", @@ -548,6 +481,11 @@ rmw_ret_t ClientData::send_request( ///============================================================================= ClientData::~ClientData() { + { + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + client_data_ptr_to_shared_ptr_map.erase(this); + } + const rmw_ret_t ret = this->shutdown(); if (ret != RMW_RET_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -556,28 +494,6 @@ ClientData::~ClientData() entity_->topic_info().value().name_.c_str() ); } - - std::lock_guard lock(num_in_flight_mutex); - auto num_in_flight_it = num_in_flight_map.find(this); - if (num_in_flight_it != num_in_flight_map.end()) { - if (num_in_flight_it->second == 0) { - // If there is nothing in flight, we can remove this from the map - // with no further considerations. - num_in_flight_map.erase(this); - } else { - // Since there is still something in flight, we need to just add - // it to the deleted_clients; it will be deleted when the last - // outstanding query finishes. - deleted_clients.insert(this); - } - } else { - // This should never happen - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Error finding client /%s in num_in_flight_map. Report this bug.", - entity_->topic_info().value().name_.c_str() - ); - } } //============================================================================== @@ -633,17 +549,6 @@ rmw_ret_t ClientData::shutdown() return RMW_RET_OK; } -///============================================================================= -bool ClientData::query_in_flight() const -{ - std::lock_guard lock(num_in_flight_mutex); - auto query_in_flight_it = num_in_flight_map.find(this); - if (query_in_flight_it != num_in_flight_map.end()) { - return query_in_flight_it->second > 0; - } - return false; -} - ///============================================================================= bool ClientData::is_shutdown() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index 4357dec2..b4c56775 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -95,11 +95,6 @@ class ClientData final : public std::enable_shared_from_this // Shutdown this ClientData. rmw_ret_t shutdown(); - // TODO(Yadunund): Remove this API once we are able to capture weak_ptr - // in the client closures to avoid the issue with queries in flight as described - // below. - bool query_in_flight() const; - // Check if this ClientData is shutdown. bool is_shutdown() const; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 5f562d20..f3430280 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -387,13 +387,10 @@ void NodeData::delete_client_data(const rmw_client_t * const client) if (client_it == clients_.end()) { return; } - // We shutdown the client first and only if that is successful, we deallocate - // the ClientData. This is to keep the ClientData alive in cases where - // rmw_destroy_client is invoked while there are still queries in flight. + // Shutdown the client, then erase it. The code in rmw_client_data.cpp is careful about keeping + // it alive as long as necessary. client_it->second->shutdown(); - if (!client_it->second->query_in_flight()) { - clients_.erase(client); - } + clients_.erase(client); } ///=============================================================================