From e29ac6de6f22852741d777318577906e5e5378a1 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 14 Nov 2024 00:44:36 -0500 Subject: [PATCH] Redo the per-client-data in-flight monitoring. (#310) * Redo the per-client-data in-flight monitoring. The previous approach with using a global map of pointers to shared_ptr does not work for one simple reason; on shutdown, C++ does not define the order in which statics are destroyed. Thus it is possible (and even likely in some cases) that on Ctrl-C, the global map would be deleted, and then we would try to access it, leading to crashes. I was seeing some of these while running the rcl tests. Instead, go back to the previous solution where we were storing num_in_flight completely within the ClientData structure. This requires doing a bit of gymnastics with the data types, and reintrodues the possible UB during shutdown if the structure was destructed but there is still queries in flight. But it otherwise fixes all of the tests for me locally. Signed-off-by: Chris Lalancette * Fixes from review. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 109 ++++++++++++------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 22 +++- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 8 +- 3 files changed, 91 insertions(+), 48 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 84ae0328..cf0fcb4e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -43,10 +43,6 @@ namespace { -std::mutex client_data_ptr_to_shared_ptr_map_mutex; -std::unordered_map> client_data_ptr_to_shared_ptr_map; - ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -59,16 +55,7 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } - std::shared_ptr client_data_shared_ptr{nullptr}; - { - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - if (client_data_ptr_to_shared_ptr_map.count(client_data) == 0) { - return; - } - client_data_shared_ptr = client_data_ptr_to_shared_ptr_map[client_data]; - } - - if (client_data_shared_ptr->is_shutdown()) { + if (client_data->is_shutdown()) { return; } @@ -94,13 +81,28 @@ void client_data_handler(z_owned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - client_data_shared_ptr->add_new_reply( + client_data->add_new_reply( std::make_unique(reply, received_timestamp)); // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } +///============================================================================= +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t from data in client_data_drop." + ); + return; + } + + client_data->decrement_in_flight_and_conditionally_remove(); +} + } // namespace namespace rmw_zenoh_cpp @@ -109,6 +111,7 @@ namespace rmw_zenoh_cpp std::shared_ptr ClientData::make( z_session_t session, const rmw_node_t * const node, + const rmw_client_t * client, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t service_id, @@ -191,6 +194,7 @@ std::shared_ptr ClientData::make( std::shared_ptr client_data = std::shared_ptr( new ClientData{ node, + client, entity, request_members, response_members, @@ -203,21 +207,20 @@ std::shared_ptr ClientData::make( return nullptr; } - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - client_data_ptr_to_shared_ptr_map.emplace(client_data.get(), client_data); - return client_data; } ///============================================================================= ClientData::ClientData( const rmw_node_t * rmw_node, + const rmw_client_t * rmw_client, std::shared_ptr entity, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, std::shared_ptr response_type_support) : rmw_node_(rmw_node), + rmw_client_(rmw_client), entity_(std::move(entity)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), @@ -225,7 +228,8 @@ ClientData::ClientData( response_type_support_(response_type_support), wait_set_data_(nullptr), sequence_number_(1), - is_shutdown_(false) + is_shutdown_(false), + num_in_flight_(0) { // Do nothing. } @@ -269,28 +273,28 @@ bool ClientData::init(z_session_t session) ///============================================================================= liveliness::TopicInfo ClientData::topic_info() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return entity_->topic_info().value(); } ///============================================================================= bool ClientData::liveliness_is_valid() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return zc_liveliness_token_check(&token_); } ///============================================================================= void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); entity_->copy_gid(out_gid); } ///============================================================================= void ClientData::add_new_reply(std::unique_ptr reply) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); const rmw_qos_profile_t adapted_qos_profile = entity_->topic_info().value().qos_; if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && @@ -324,7 +328,7 @@ rmw_ret_t ClientData::take_response( void * ros_response, bool * taken) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); *taken = false; if (is_shutdown_ || reply_queue_.empty()) { @@ -389,7 +393,7 @@ rmw_ret_t ClientData::send_request( const void * ros_request, int64_t * sequence_id) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (is_shutdown_) { return RMW_RET_OK; } @@ -468,7 +472,7 @@ rmw_ret_t ClientData::send_request( // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. z_owned_closure_reply_t zn_closure_reply = - z_closure(client_data_handler, nullptr, this); + z_closure(client_data_handler, client_data_drop, this); z_get( context_impl->session(), z_loan(keyexpr_), "", @@ -481,11 +485,6 @@ rmw_ret_t ClientData::send_request( ///============================================================================= ClientData::~ClientData() { - { - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - client_data_ptr_to_shared_ptr_map.erase(this); - } - const rmw_ret_t ret = this->shutdown(); if (ret != RMW_RET_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -501,7 +500,7 @@ void ClientData::set_on_new_response_callback( rmw_event_callback_t callback, const void * user_data) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); data_callback_mgr_.set_callback(user_data, std::move(callback)); } @@ -509,7 +508,7 @@ void ClientData::set_on_new_response_callback( bool ClientData::queue_has_data_and_attach_condition_if_not( rmw_wait_set_data_t * wait_set_data) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (!reply_queue_.empty()) { return true; } @@ -521,19 +520,17 @@ bool ClientData::queue_has_data_and_attach_condition_if_not( ///============================================================================= bool ClientData::detach_condition_and_queue_is_empty() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); wait_set_data_ = nullptr; return reply_queue_.empty(); } ///============================================================================= -rmw_ret_t ClientData::shutdown() +void ClientData::_shutdown() { - rmw_ret_t ret = RMW_RET_OK; - std::lock_guard lock(mutex_); if (is_shutdown_) { - return ret; + return; } // Unregister this node from the ROS graph. @@ -545,13 +542,47 @@ rmw_ret_t ClientData::shutdown() } is_shutdown_ = true; +} + +///============================================================================= +rmw_ret_t ClientData::shutdown() +{ + std::lock_guard lock(mutex_); + _shutdown(); return RMW_RET_OK; } +///============================================================================= +bool ClientData::shutdown_and_query_in_flight() +{ + std::lock_guard lock(mutex_); + _shutdown(); + return num_in_flight_ > 0; +} + +///============================================================================= +void ClientData::decrement_in_flight_and_conditionally_remove() +{ + std::lock_guard lock(mutex_); + --num_in_flight_; + + if (is_shutdown_ && num_in_flight_ == 0) { + rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); + if (context_impl == nullptr) { + return; + } + std::shared_ptr node_data = context_impl->get_node_data(rmw_node_); + if (node_data == nullptr) { + return; + } + node_data->delete_client_data(rmw_client_); + } +} + ///============================================================================= bool ClientData::is_shutdown() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return is_shutdown_; } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index cee312b1..eda4bf9c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -49,6 +49,7 @@ class ClientData final : public std::enable_shared_from_this static std::shared_ptr make( z_session_t session, const rmw_node_t * const node, + const rmw_client_t * client, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t service_id, @@ -79,22 +80,27 @@ class ClientData final : public std::enable_shared_from_this const void * ros_request, int64_t * sequence_id); + // Set a callback to be called when events happen. void set_on_new_response_callback( rmw_event_callback_t callback, const void * user_data); - // rmw_wait helpers. + // Check if there is data in the queue, and if not attach the wait set condition variable. bool queue_has_data_and_attach_condition_if_not( rmw_wait_set_data_t * wait_set_data); + // Detach any attached wait set condition variable, and return whether there is data in the queue. bool detach_condition_and_queue_is_empty(); - // See the comment for "num_in_flight" below on the use of this method. - void decrement_queries_in_flight(); - // Shutdown this ClientData. rmw_ret_t shutdown(); + // Shutdown this ClientData, and return whether there are any requests currently in flight. + bool shutdown_and_query_in_flight(); + + // Decrement the in flight requests, and if that drops to 0 remove the client from the node. + void decrement_in_flight_and_conditionally_remove(); + // Check if this ClientData is shutdown. bool is_shutdown() const; @@ -105,6 +111,7 @@ class ClientData final : public std::enable_shared_from_this // Constructor. ClientData( const rmw_node_t * rmw_node, + const rmw_client_t * client, std::shared_ptr entity, const void * request_type_support_impl, const void * response_type_support_impl, @@ -114,10 +121,14 @@ class ClientData final : public std::enable_shared_from_this // Initialize the Zenoh objects for this entity. bool init(z_session_t session); + // Shutdown this client (the mutex is expected to be held by the caller). + void _shutdown(); + // Internal mutex. - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // The parent node. const rmw_node_t * rmw_node_; + const rmw_client_t * rmw_client_; // The Entity generated for the service. std::shared_ptr entity_; // An owned keyexpression. @@ -139,6 +150,7 @@ class ClientData final : public std::enable_shared_from_this size_t sequence_number_; // Shutdown flag. bool is_shutdown_; + size_t num_in_flight_; }; using ClientDataPtr = std::shared_ptr; using ClientDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index f3430280..bd3f3f6e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -347,6 +347,7 @@ bool NodeData::create_client_data( auto client_data = ClientData::make( std::move(session), node_, + client, entity_->node_info(), id_, std::move(id), @@ -387,10 +388,9 @@ void NodeData::delete_client_data(const rmw_client_t * const client) if (client_it == clients_.end()) { return; } - // Shutdown the client, then erase it. The code in rmw_client_data.cpp is careful about keeping - // it alive as long as necessary. - client_it->second->shutdown(); - clients_.erase(client); + if (!client_it->second->shutdown_and_query_in_flight()) { + clients_.erase(client); + } } ///=============================================================================