From 474eb97b242bede4481d2ecc77e932de00a4bf1e Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 10 Oct 2024 07:12:05 +0800 Subject: [PATCH 01/10] Make ClientData thread-safe Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 2 +- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 9 +- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 3 +- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 569 ++++++++++++++++++ rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 163 +++++ .../src/detail/rmw_context_impl_s.hpp | 2 - rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 220 ------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 131 ---- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 79 +++ rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 18 + rmw_zenoh_cpp/src/rmw_event.cpp | 1 - rmw_zenoh_cpp/src/rmw_init.cpp | 1 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 520 +++------------- 13 files changed, 915 insertions(+), 803 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_client_data.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_client_data.hpp delete mode 100644 rmw_zenoh_cpp/src/detail/rmw_data_types.cpp delete mode 100644 rmw_zenoh_cpp/src/detail/rmw_data_types.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d6080970..7c4a27eb 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -38,8 +38,8 @@ add_library(rmw_zenoh_cpp SHARED src/detail/logging.cpp src/detail/message_type_support.cpp src/detail/qos.cpp + src/detail/rmw_client_data.cpp src/detail/rmw_context_impl_s.cpp - src/detail/rmw_data_types.cpp src/detail/rmw_publisher_data.cpp src/detail/rmw_node_data.cpp src/detail/rmw_service_data.cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 22f1b153..776d343d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -36,7 +36,6 @@ #include "graph_cache.hpp" #include "logging_macros.hpp" -#include "rmw_data_types.hpp" namespace rmw_zenoh_cpp { @@ -1182,15 +1181,15 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( ///============================================================================= rmw_ret_t GraphCache::service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const { *is_available = false; std::lock_guard lock(graph_mutex_); - GraphNode::TopicMap::const_iterator service_it = graph_services_.find(service_name); + GraphNode::TopicMap::const_iterator service_it = graph_services_.find(client_topic_info.name_); if (service_it != graph_services_.end()) { - GraphNode::TopicTypeMap::const_iterator type_it = service_it->second.find(service_type); + GraphNode::TopicTypeMap::const_iterator type_it = + service_it->second.find(client_topic_info.type_); if (type_it != service_it->second.end()) { for (const auto & [_, topic_data] : type_it->second) { if (topic_data->subs_.size() > 0) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index f2302efd..8ac2172d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -172,8 +172,7 @@ class GraphCache final rmw_topic_endpoint_info_array_t * endpoints_info) const; rmw_ret_t service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const; /// Set a qos event callback for an entity from the current session. diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp new file mode 100644 index 00000000..4a86b6ea --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -0,0 +1,569 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_client_data.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "attachment_helpers.hpp" +#include "cdr.hpp" +#include "rmw_context_impl_s.hpp" +#include "message_type_support.hpp" +#include "logging_macros.hpp" +#include "qos.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +void client_data_handler(z_owned_reply_t * reply, void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + // See the comment about the "num_in_flight" class variable in the ClientData class for + // why we need to do this. + if (client_data->is_shutdown()) { + return; + } + + if (!z_reply_check(reply)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_check returned False" + ); + return; + } + if (!z_reply_is_ok(reply)) { + z_value_t err = z_reply_err(reply); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", + client_data->topic_info().topic_keyexpr_.c_str(), + (int)err.payload.len, + err.payload.start); + + return; + } + + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + client_data->add_new_reply(std::make_unique(reply, received_timestamp)); + + // Since we took ownership of the reply, null it out here + *reply = z_reply_null(); +} + +///============================================================================= +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + // See the comment about the "num_in_flight" class variable in the ClientData class for + // why we need to do this. + bool queries_in_flight = false; + bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); + + if (is_shutdown) { + if (!queries_in_flight) { + // TODO(Yadunund): Do we still need this? + // RMW_TRY_DESTRUCTOR(~ClientData(), ClientData, ); + // context->options.allocator.deallocate( + // client_data, context->options.allocator.state); + } + } +} + +///============================================================================= +std::shared_ptr ClientData::make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + // Adapt any 'best available' QoS options + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + nullptr, nullptr, &adapted_qos_profile, nullptr); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + auto request_type_support = std::make_unique(service_members); + auto response_type_support = std::make_unique(service_members); + + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = response_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Response_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), service_name.c_str()); + return nullptr; + } + + // Convert the type hash to a string so that it can be included in the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + std::size_t domain_id = node_info.domain_id_; + auto entity = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(service_id), + liveliness::EntityType::Client, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + service_name, + std::move(service_type), + type_hash_c_str, + std::move(adapted_qos_profile)} + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the client %s.", + service_name); + return nullptr; + } + + auto client_data = std::shared_ptr( + new ClientData{ + node, + std::move(entity), + request_members, + response_members, + std::move(request_type_support), + std::move(response_type_support) + }); + + client_data->keyexpr_ = + z_keyexpr_new(client_data->entity_->topic_info().value().topic_keyexpr_.c_str()); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [client_data]() { + z_drop(z_move(client_data->keyexpr_)); + }); + if (!z_check(z_loan(client_data->keyexpr_))) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + + client_data->token_ = zc_liveliness_declare_token( + session, + z_keyexpr(client_data->entity_->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [client_data]() { + z_drop(z_move(client_data->token_)); + }); + if (!z_check(client_data->token_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return nullptr; + } + + free_ros_keyexpr.cancel(); + free_token.cancel(); + + return client_data; +} + +///============================================================================= +ClientData::ClientData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::unique_ptr request_type_support, + std::unique_ptr response_type_support) +: rmw_node_(rmw_node), + entity_(std::move(entity)), + request_type_support_impl_(request_type_support_impl), + response_type_support_impl_(response_type_support_impl), + request_type_support_(std::move(request_type_support)), + response_type_support_(std::move(response_type_support)), + wait_set_data_(nullptr), + sequence_number_(1), + num_in_flight_(0), + is_shutdown_(false) +{ + // Do nothing. +} + +///============================================================================= +liveliness::TopicInfo ClientData::topic_info() const +{ + std::lock_guard lock(mutex_); + return entity_->topic_info().value(); +} + +///============================================================================= +bool ClientData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + return zc_liveliness_token_check(&token_); +} + +///============================================================================= +void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +{ + std::lock_guard lock(mutex_); + entity_->copy_gid(out_gid); +} + +///============================================================================= +void ClientData::add_new_reply(std::unique_ptr reply) +{ + std::lock_guard lock(mutex_); + const rmw_qos_profile_t adapted_qos_profile = + entity_->topic_info().value().qos_; + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + reply_queue_.size() >= adapted_qos_profile.depth) + { + // Log warning if message is discarded due to hitting the queue depth + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(keyexpr_)); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Query queue depth of %ld reached, discarding oldest Query " + "for client for %s", + adapted_qos_profile.depth, + z_loan(keystr)); + z_drop(z_move(keystr)); + reply_queue_.pop_front(); + } + reply_queue_.emplace_back(std::move(reply)); + + // Since we added new data, trigger user callback and guard condition if they are available + data_callback_mgr_.trigger_callback(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); + } +} + +///============================================================================= +rmw_ret_t ClientData::take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken) +{ + std::lock_guard lock(mutex_); + *taken = false; + + if (is_shutdown_ || reply_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + std::unique_ptr latest_reply = std::move(reply_queue_.front()); + reply_queue_.pop_front(); + + std::optional sample = latest_reply->get_sample(); + if (!sample) { + RMW_SET_ERROR_MSG("invalid reply sample"); + return RMW_RET_ERROR; + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(sample->payload.start)), + sample->payload.len); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!response_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_response, + response_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } + + // Fill in the request_header + request_header->request_id.sequence_number = + rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); + return RMW_RET_ERROR; + } + request_header->source_timestamp = + rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + if (!rmw_zenoh_cpp::get_gid_from_attachment( + &sample->attachment, + request_header->request_id.writer_guid)) + { + RMW_SET_ERROR_MSG("Could not get client gid from attachment"); + return RMW_RET_ERROR; + } + + request_header->received_timestamp = latest_reply->get_received_timestamp(); + + *taken = true; + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t ClientData::send_request( + const void * ros_request, + int64_t * sequence_id) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + rmw_context_impl_s * context_impl = static_cast(rmw_node_->context->impl); + if (context_impl == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } + + size_t max_data_length = ( + request_type_support_->get_estimated_serialized_size( + ros_request, request_type_support_impl_)); + + // Init serialized message byte array + char * request_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!request_bytes) { + RMW_SET_ERROR_MSG("failed allocate request message bytes"); + return RMW_RET_ERROR; + } + auto always_free_request_bytes = rcpputils::make_scope_exit( + [request_bytes, allocator]() { + allocator->deallocate(request_bytes, allocator->state); + }); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); + // Object that serializes the data + Cdr ser(fastbuffer); + if (!request_type_support_->serialize_ros_message( + ros_request, + ser.get_cdr(), + request_type_support_impl_)) + { + return RMW_RET_ERROR; + } + size_t data_length = ser.get_serialized_data_length(); + *sequence_id = sequence_number_++; + + // Send request + z_get_options_t opts = z_get_options_default(); + z_owned_bytes_map_t map = create_map_and_set_sequence_num( + *sequence_id, + [this](z_owned_bytes_map_t * map, const char * key) + { + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + z_bytes_t gid_bytes; + gid_bytes.len = RMW_GID_STORAGE_SIZE; + gid_bytes.start = local_gid; + z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); + }); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto always_free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + // See the comment about the "num_in_flight" class variable in the ClientData class for + // why we need to do this. + num_in_flight_++; + opts.attachment = z_bytes_map_as_attachment(&map); + opts.target = Z_QUERY_TARGET_ALL_COMPLETE; + // The default timeout for a z_get query is 10 seconds and if a response is not received within + // this window, the queryable will return an invalid reply. However, it is common for actions, + // which are implemented using services, to take an extended duration to complete. Hence, we set + // the timeout_ms to the largest supported value to account for most realistic scenarios. + opts.timeout_ms = std::numeric_limits::max(); + // Latest consolidation guarantees unicity of replies for the same key expression, + // which optimizes bandwidth. The default is "None", which imples replies may come in any order + // and any number. + opts.consolidation = z_query_consolidation_latest(); + opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; + // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, + // capture shared_from_this() instead of this. + z_owned_closure_reply_t zn_closure_reply = + z_closure(client_data_handler, client_data_drop, this); + z_get( + context_impl->session(), + z_loan(keyexpr_), "", + z_move(zn_closure_reply), + &opts); + + return RMW_RET_OK; +} + +///============================================================================= +ClientData::~ClientData() +{ + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing client /%s.", + entity_->topic_info().value().name_.c_str() + ); + } +} + +//============================================================================== +void ClientData::set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data) +{ + std::lock_guard lock(mutex_); + data_callback_mgr_.set_callback(user_data, std::move(callback)); +} + +///============================================================================= +bool ClientData::queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data) +{ + std::lock_guard lock(mutex_); + if (!reply_queue_.empty()) { + return true; + } + wait_set_data_ = wait_set_data; + + return false; +} + +///============================================================================= +bool ClientData::detach_condition_and_queue_is_empty() +{ + std::lock_guard lock(mutex_); + wait_set_data_ = nullptr; + + return reply_queue_.empty(); +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure +// for the use of this method. +bool ClientData::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) +{ + std::lock_guard lock(mutex_); + queries_in_flight = --num_in_flight_ > 0; + return is_shutdown_; +} + +///============================================================================= +rmw_ret_t ClientData::shutdown() +{ + rmw_ret_t ret = RMW_RET_OK; + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return ret; + } + + if (num_in_flight_ > 0) { + // TODO(Yadunund): Check if we need to do something. + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Client %s is shutting down while there are still queries in flight.", + entity_->topic_info().value().name_.c_str() + ); + } + + // Unregister this node from the ROS graph. + if (zc_liveliness_token_check(&token_)) { + zc_liveliness_undeclare_token(z_move(token_)); + } + if (z_check(z_loan(keyexpr_))) { + z_drop(z_move(keyexpr_)); + } + + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +bool ClientData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp new file mode 100644 index 00000000..e592fc40 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -0,0 +1,163 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_CLIENT_DATA_HPP_ +#define DETAIL__RMW_CLIENT_DATA_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.hpp" +#include "liveliness_utils.hpp" +#include "message_type_support.hpp" +#include "service_type_support.hpp" +#include "type_support_common.hpp" +#include "zenoh_utils.hpp" + +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" +#include "rmw/ret_types.h" + +namespace rmw_zenoh_cpp +{ + +///============================================================================= +class ClientData final +{ +public: + // Make a shared_ptr of ClientData. + static std::shared_ptr make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Get a copy of the TopicInfo of this ClientData. + liveliness::TopicInfo topic_info() const; + + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + + // Copy the GID of this ClientData into an rmw_gid_t. + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + + // Add a new ZenohReply to the queue. + void add_new_reply(std::unique_ptr reply); + + // Take a ROS service response. + rmw_ret_t take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken); + + // Send a service request. + rmw_ret_t send_request( + const void * ros_request, + int64_t * sequence_id); + + void set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data); + + // rmw_wait helpers. + bool queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data); + + bool detach_condition_and_queue_is_empty(); + + // See the comment for "num_in_flight" below on the use of this method. + bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + + // Shutdown this ClientData. + rmw_ret_t shutdown(); + + // Check if this ClientData is shutdown. + bool is_shutdown() const; + + // Destructor. + ~ClientData(); + +private: + // Constructor. + ClientData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::unique_ptr request_type_support, + std::unique_ptr response_type_support); + + // Internal mutex. + mutable std::mutex mutex_; + // The parent node. + const rmw_node_t * rmw_node_; + // The Entity generated for the service. + std::shared_ptr entity_; + // An owned keyexpression. + z_owned_keyexpr_t keyexpr_; + // Liveliness token for the service. + zc_owned_liveliness_token_t token_; + // Type support fields. + const void * request_type_support_impl_; + const void * response_type_support_impl_; + std::unique_ptr request_type_support_; + std::unique_ptr response_type_support_; + // Deque to store the replies in the order they arrive. + std::deque> reply_queue_; + // Wait set data. + rmw_wait_set_data_t * wait_set_data_; + // Data callback manager. + DataCallbackManager data_callback_mgr_; + // Sequence number for queries. + size_t sequence_number_; + // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no + // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an + // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the + // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. + // The next 2 variables are used to avoid that situation. Any time a query is initiated via + // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the + // query reply, num_in_flight_ is decremented. + // When shutdown() is called, is_shutdown_ is set to true. + // If num_in_flight_ is 0, the data associated with this structure is freed. + // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. + // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query + // callback, the query callback will free up the structure. + // + // There is one case which is not handled by this, which has to do with timeouts. The query + // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never + // returns, the memory in this structure will never be freed. There isn't much we can do about + // that at this time, but we may want to consider changing the timeout so that the memory can + // eventually be freed up. + size_t num_in_flight_; + // Shutdown flag. + bool is_shutdown_; +}; +using ClientDataPtr = std::shared_ptr; +using ClientDataConstPtr = std::shared_ptr; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_CLIENT_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index a35b9ffe..b9367e6f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -48,8 +48,6 @@ class rmw_context_impl_s final std::string enclave() const; // Loan the Zenoh session. - // TODO(Yadunund): Remove this API once rmw_context_impl_s is updated to - // create other Zenoh objects. z_session_t session() const; // Get a reference to the shm_manager. diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp deleted file mode 100644 index acf0bc70..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include - -#include "logging_macros.hpp" - -#include "rmw/error_handling.h" -#include "rmw/impl/cpp/macros.hpp" - -#include "attachment_helpers.hpp" -#include "rmw_data_types.hpp" - -///============================================================================= -namespace rmw_zenoh_cpp -{ -///============================================================================= -void rmw_client_data_t::notify() -{ - std::lock_guard lock(condition_mutex_); - if (wait_set_data_ != nullptr) { - std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); - wait_set_data_->triggered = true; - wait_set_data_->condition_variable.notify_one(); - } -} - -///============================================================================= -void rmw_client_data_t::add_new_reply(std::unique_ptr reply) -{ - std::lock_guard lock(reply_queue_mutex_); - if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && - reply_queue_.size() >= adapted_qos_profile.depth) - { - // Log warning if message is discarded due to hitting the queue depth - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Reply queue depth of %ld reached, discarding oldest reply " - "for client for %s", - adapted_qos_profile.depth, - z_loan(keystr)); - z_drop(z_move(keystr)); - reply_queue_.pop_front(); - } - reply_queue_.emplace_back(std::move(reply)); - - // Since we added new data, trigger user callback and guard condition if they are available - data_callback_mgr.trigger_callback(); - notify(); -} - -///============================================================================= -bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) -{ - std::lock_guard lock(condition_mutex_); - if (!reply_queue_.empty()) { - return true; - } - wait_set_data_ = wait_set_data; - - return false; -} - -///============================================================================= -bool rmw_client_data_t::detach_condition_and_queue_is_empty() -{ - std::lock_guard lock(condition_mutex_); - wait_set_data_ = nullptr; - - return reply_queue_.empty(); -} - -///============================================================================= -std::unique_ptr rmw_client_data_t::pop_next_reply() -{ - std::lock_guard lock(reply_queue_mutex_); - - if (reply_queue_.empty()) { - return nullptr; - } - - std::unique_ptr latest_reply = std::move(reply_queue_.front()); - reply_queue_.pop_front(); - - return latest_reply; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -void rmw_client_data_t::increment_in_flight_callbacks() -{ - std::lock_guard lock(in_flight_mutex_); - num_in_flight_++; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -bool rmw_client_data_t::shutdown_and_query_in_flight() -{ - std::lock_guard lock(in_flight_mutex_); - is_shutdown_ = true; - - return num_in_flight_ > 0; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure -// for the use of this method. -bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) -{ - std::lock_guard lock(in_flight_mutex_); - queries_in_flight = --num_in_flight_ > 0; - return is_shutdown_; -} - -bool rmw_client_data_t::is_shutdown() const -{ - std::lock_guard lock(in_flight_mutex_); - return is_shutdown_; -} - - -///============================================================================= -size_t rmw_client_data_t::get_next_sequence_number() -{ - std::lock_guard lock(sequence_number_mutex_); - return sequence_number_++; -} - -//============================================================================== -void client_data_handler(z_owned_reply_t * reply, void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (client_data->is_shutdown()) { - return; - } - - if (!z_reply_check(reply)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_check returned False" - ); - return; - } - if (!z_reply_is_ok(reply)) { - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(client_data->keyexpr)); - z_value_t err = z_reply_err(reply); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - z_loan(keystr), - (int)err.payload.len, - err.payload.start); - z_drop(z_move(keystr)); - - return; - } - - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); - - client_data->add_new_reply(std::make_unique(reply, received_timestamp)); - // Since we took ownership of the reply, null it out here - *reply = z_reply_null(); -} - -void client_data_drop(void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - bool queries_in_flight = false; - bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); - - if (is_shutdown) { - if (!queries_in_flight) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - client_data->context->options.allocator.deallocate( - client_data, client_data->context->options.allocator.state); - } - } -} - -} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp deleted file mode 100644 index 5605d7f0..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__RMW_DATA_TYPES_HPP_ -#define DETAIL__RMW_DATA_TYPES_HPP_ - -#include - -#include -#include -#include -#include -#include -#include - -#include "rcutils/allocator.h" - -#include "rmw/rmw.h" - -#include "rosidl_runtime_c/type_hash.h" - -#include "event.hpp" -#include "liveliness_utils.hpp" -#include "message_type_support.hpp" -#include "rmw_wait_set_data.hpp" -#include "service_type_support.hpp" -#include "zenoh_utils.hpp" - -/// Structs for various type erased data fields. -namespace rmw_zenoh_cpp -{ -///============================================================================= -void client_data_handler(z_owned_reply_t * reply, void * client_data); -void client_data_drop(void * data); - -///============================================================================= -class rmw_client_data_t final -{ -public: - // The Entity generated for the client. - std::shared_ptr entity; - - z_owned_keyexpr_t keyexpr; - - // Store the actual QoS profile used to configure this client. - // The QoS is reused for sending requests and getting responses. - rmw_qos_profile_t adapted_qos_profile; - - // Liveliness token for the client. - zc_owned_liveliness_token_t token; - - const void * request_type_support_impl; - const void * response_type_support_impl; - const char * typesupport_identifier; - const rosidl_type_hash_t * type_hash; - RequestTypeSupport * request_type_support; - ResponseTypeSupport * response_type_support; - - rmw_context_t * context; - - size_t get_next_sequence_number(); - - void add_new_reply(std::unique_ptr reply); - - bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - - bool detach_condition_and_queue_is_empty(); - - std::unique_ptr pop_next_reply(); - - DataCallbackManager data_callback_mgr; - - // See the comment for "num_in_flight" below on the use of this method. - void increment_in_flight_callbacks(); - - // See the comment for "num_in_flight" below on the use of this method. - bool shutdown_and_query_in_flight(); - - // See the comment for "num_in_flight" below on the use of this method. - bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); - - bool is_shutdown() const; - -private: - void notify(); - - size_t sequence_number_{1}; - std::mutex sequence_number_mutex_; - - rmw_wait_set_data_t * wait_set_data_{nullptr}; - std::mutex condition_mutex_; - - std::deque> reply_queue_; - mutable std::mutex reply_queue_mutex_; - - // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no - // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an - // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the - // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. - // - // The next 3 variables are used to avoid that situation. Any time a query is initiated via - // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the - // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ - // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. - // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. - // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query - // callback, the query callback will free up the structure. - // - // There is one case which is not handled by this, which has to do with timeouts. The query - // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never - // returns, the memory in this structure will never be freed. There isn't much we can do about - // that at this time, but we may want to consider changing the timeout so that the memory can - // eventually be freed up. - mutable std::mutex in_flight_mutex_; - bool is_shutdown_{false}; - size_t num_in_flight_{0}; -}; -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 19357c3d..1b853a41 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -319,6 +319,73 @@ void NodeData::delete_service_data(const rmw_service_t * const service) services_.erase(service); } + +///============================================================================= +bool NodeData::create_client_data( + const rmw_client_t * const client, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_supports, + const rmw_qos_profile_t * qos_profile) +{ + std::lock_guard lock_guard(mutex_); + if (is_shutdown_) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create ClientData as the NodeData has been shutdown."); + return false; + } + + if (clients_.count(client) > 0) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "ClientData already exists."); + return false; + } + + auto client_data = ClientData::make( + std::move(session), + node_, + entity_->node_info(), + id_, + std::move(id), + std::move(service_name), + type_supports, + qos_profile); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make ClientData."); + return false; + } + + auto insertion = clients_.insert(std::make_pair(client, std::move(client_data))); + if (!insertion.second) { + return false; + } + return true; +} + +///============================================================================= +ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + auto it = clients_.find(client); + if (it == clients_.end()) { + return nullptr; + } + + return it->second; +} + +///============================================================================= +void NodeData::delete_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + clients_.erase(client); +} + ///============================================================================= rmw_ret_t NodeData::shutdown() { @@ -365,6 +432,18 @@ rmw_ret_t NodeData::shutdown() ); } } + for (auto cli_it = clients_.begin(); cli_it != clients_.end(); ++cli_it) { + ret = cli_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown client %s within id %zu. rmw_ret_t code: %zu.", + cli_it->second->topic_info().name_.c_str(), + id_, + ret + ); + } + } // Unregister this node from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index c6c74e0d..f85b1366 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -25,6 +25,7 @@ #include "graph_cache.hpp" #include "liveliness_utils.hpp" +#include "rmw_client_data.hpp" #include "rmw_publisher_data.hpp" #include "rmw_subscription_data.hpp" #include "rmw_service_data.hpp" @@ -97,6 +98,21 @@ class NodeData final // Delete the ServiceData for a given rmw_service_t if present. void delete_service_data(const rmw_service_t * const service); + // Create a new ClientData for a given rmw_client_t. + bool create_client_data( + const rmw_client_t * const client, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Retrieve the ClientData for a given rmw_client_t if present. + ClientDataPtr get_client_data(const rmw_client_t * const client); + + // Delete the ClientData for a given rmw_client_t if present. + void delete_client_data(const rmw_client_t * const client); + // Shutdown this NodeData. rmw_ret_t shutdown(); @@ -132,6 +148,8 @@ class NodeData final std::unordered_map subs_; // Map of services. std::unordered_map services_; + // Map of clients. + std::unordered_map clients_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 4fe298a0..63de9bcb 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -21,7 +21,6 @@ #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/rmw_publisher_data.hpp" #include "detail/rmw_subscription_data.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 29a81bd3..c61a44fa 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -22,7 +22,6 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "rcutils/env.h" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 761fbdcf..51dc4fb3 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -36,7 +36,6 @@ #include "detail/message_type_support.hpp" #include "detail/qos.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" #include "detail/zenoh_utils.hpp" @@ -1363,21 +1362,28 @@ rmw_create_client( const rmw_qos_profile_t * qos_profile) { RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return nullptr); - + RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); if (strlen(service_name) == 0) { RMW_SET_ERROR_MSG("service name is empty string"); return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); - RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); - + // Validate service name + int validation_result; + if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { + RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + return nullptr; + } + if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + return nullptr; + } RMW_CHECK_FOR_NULL_WITH_MSG( node->context, "expected initialized context", @@ -1397,23 +1403,22 @@ rmw_create_client( return nullptr; } - rcutils_allocator_t * allocator = &node->context->options.allocator; - - // Validate service name - int validation_result; - - if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { - RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + // Get the RMW the type support. + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_service_type_support return nullptr; } - if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + rcutils_allocator_t * allocator = &node->context->options.allocator; + if (!rcutils_allocator_is_valid(allocator)) { + RMW_SET_ERROR_MSG("allocator is invalid."); return nullptr; } - // client data - rmw_client_t * rmw_client = static_cast(allocator->zero_allocate( + // Create the rmw_client. + rmw_client_t * rmw_client = + static_cast(allocator->zero_allocate( 1, sizeof(rmw_client_t), allocator->state)); @@ -1421,226 +1426,46 @@ rmw_create_client( rmw_client, "failed to allocate memory for the client", return nullptr); - auto free_rmw_client = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(rmw_client, allocator->state); }); - auto client_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_client_data_t), allocator->state)); + auto node_data = context_impl->get_node_data(node); RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "failed to allocate memory for client data", + node_data, + "NodeData not found.", return nullptr); - auto free_client_data = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data, allocator->state); - }); - RMW_TRY_PLACEMENT_NEW( - client_data, - client_data, - return nullptr, - rmw_zenoh_cpp::rmw_client_data_t, - ); - auto destruct_client_data = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->~rmw_client_data_t(), - rmw_zenoh_cpp::rmw_client_data_t); - }); - - // Adapt any 'best available' QoS options - client_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( - nullptr, nullptr, &client_data->adapted_qos_profile, nullptr); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); - return nullptr; - } - - // Obtain the type support - const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); - if (type_support == nullptr) { - // error was already set by find_service_type_support + if (!node_data->create_client_data( + rmw_client, + context_impl->session(), + context_impl->get_next_entity_id(), + service_name, + type_support, + qos_profile)) + { + // Error already handled. return nullptr; } - auto service_members = static_cast(type_support->data); - auto request_members = static_cast( - service_members->request_members_->data); - auto response_members = static_cast( - service_members->response_members_->data); - - client_data->context = node->context; - client_data->typesupport_identifier = type_support->typesupport_identifier; - client_data->type_hash = type_support->get_type_hash_func(type_support); - client_data->request_type_support_impl = request_members; - client_data->response_type_support_impl = response_members; - - // Request type support - client_data->request_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::RequestTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->request_type_support, - "Failed to allocate rmw_zenoh_cpp::RequestTypeSupport", - return nullptr); - auto free_request_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->request_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->request_type_support, - client_data->request_type_support, - return nullptr, - rmw_zenoh_cpp::RequestTypeSupport, service_members); - auto destruct_request_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->request_type_support->~RequestTypeSupport(), - rmw_zenoh_cpp::RequestTypeSupport); - }); - - // Response type support - client_data->response_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::ResponseTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->response_type_support, - "Failed to allocate rmw_zenoh_cpp::ResponseTypeSupport", - return nullptr); - auto free_response_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->response_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->response_type_support, - client_data->response_type_support, - return nullptr, - rmw_zenoh_cpp::ResponseTypeSupport, service_members); - auto destruct_response_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->response_type_support->~ResponseTypeSupport(), - rmw_zenoh_cpp::ResponseTypeSupport); - }); - - // Populate the rmw_client. + // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased + // Client handle will be returned in the rmw_clients_t in rmw_wait + // from which we cannot obtain ClientData. + rmw_client->data = static_cast(node_data->get_client_data(rmw_client).get()); rmw_client->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_client->service_name = rcutils_strdup(service_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( rmw_client->service_name, - "failed to allocate client name", + "failed to allocate service name", return nullptr); auto free_service_name = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); }); - // Note: Service request/response types will contain a suffix Request_ or Response_. - // We remove the suffix when appending the type to the liveliness tokens for - // better reusability within GraphCache. - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), rmw_client->service_name); - return nullptr; - } - - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - client_data->type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - z_session_t session = context_impl->session(); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "NodeData not found.", - return nullptr); - client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id()), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Client, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - node->context->actual_domain_id, - rmw_client->service_name, - std::move(service_type), - type_hash_c_str, - client_data->adapted_qos_profile} - ); - if (client_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the client %s.", - rmw_client->service_name); - return nullptr; - } - - client_data->keyexpr = z_keyexpr_new(client_data->entity->topic_info()->topic_keyexpr_.c_str()); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { - z_keyexpr_drop(z_move(client_data->keyexpr)); - }); - if (!z_keyexpr_check(&client_data->keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - - client_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(client_data->entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [client_data]() { - if (client_data != nullptr) { - z_drop(z_move(client_data->token)); - } - }); - if (!z_check(client_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the client."); - return nullptr; - } - - rmw_client->data = client_data; - - free_token.cancel(); free_rmw_client.cancel(); - free_client_data.cancel(); - destruct_request_type_support.cancel(); - free_request_type_support.cancel(); - destruct_response_type_support.cancel(); - free_response_type_support.cancel(); - destruct_client_data.cancel(); free_service_name.cancel(); - free_ros_keyexpr.cancel(); return rmw_client; } @@ -1650,10 +1475,12 @@ rmw_create_client( rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1664,36 +1491,15 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rcutils_allocator_t * allocator = &node->context->options.allocator; - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "client implementation pointer is null.", - return RMW_RET_INVALID_ARGUMENT); - - // CLEANUP =================================================================== - z_drop(z_move(client_data->keyexpr)); - zc_liveliness_undeclare_token(z_move(client_data->token)); - - RMW_TRY_DESTRUCTOR( - client_data->request_type_support->~RequestTypeSupport(), rmw_zenoh_cpp::RequestTypeSupport, ); - allocator->deallocate(client_data->request_type_support, allocator->state); - - RMW_TRY_DESTRUCTOR( - client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, - ); - allocator->deallocate(client_data->response_type_support, allocator->state); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (!client_data->shutdown_and_query_in_flight()) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + auto node_data = context_impl->get_node_data(node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; } + // Remove the ClientData from NodeData. + node_data->delete_client_data(client); + + rcutils_allocator_t * allocator = &node->context->options.allocator; allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -1710,115 +1516,23 @@ rmw_send_request( { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client_data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); - if (client_data->is_shutdown()) { - return RMW_RET_ERROR; - } - - rmw_context_impl_s * context_impl = static_cast( - client_data->context->impl); - - // Serialize data - - rcutils_allocator_t * allocator = &(client_data->context->options.allocator); - - size_t max_data_length = ( - client_data->request_type_support->get_estimated_serialized_size( - ros_request, client_data->request_type_support_impl)); - - // Init serialized message byte array - char * request_bytes = static_cast(allocator->allocate( - max_data_length, - allocator->state)); - if (!request_bytes) { - RMW_SET_ERROR_MSG("failed allocate request message bytes"); - return RMW_RET_ERROR; - } - auto free_request_bytes = rcpputils::make_scope_exit( - [request_bytes, allocator]() { - allocator->deallocate(request_bytes, allocator->state); - }); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!client_data->request_type_support->serialize_ros_message( - ros_request, - ser.get_cdr(), - client_data->request_type_support_impl)) - { - return RMW_RET_ERROR; - } - - size_t data_length = ser.get_serialized_data_length(); - - *sequence_id = client_data->get_next_sequence_number(); - - // Send request - z_get_options_t opts = z_get_options_default(); - - z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( - *sequence_id, - [client_data](z_owned_bytes_map_t * map, const char * key) - { - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - client_data->entity->copy_gid(local_gid); - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = local_gid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - client_data->increment_in_flight_callbacks(); - - opts.attachment = z_bytes_map_as_attachment(&map); - - opts.target = Z_QUERY_TARGET_ALL_COMPLETE; - // The default timeout for a z_get query is 10 seconds and if a response is not received within - // this window, the queryable will return an invalid reply. However, it is common for actions, - // which are implemented using services, to take an extended duration to complete. Hence, we set - // the timeout_ms to the largest supported value to account for most realistic scenarios. - opts.timeout_ms = std::numeric_limits::max(); - // Latest consolidation guarantees unicity of replies for the same key expression, - // which optimizes bandwidth. The default is "None", which imples replies may come in any order - // and any number. - opts.consolidation = z_query_consolidation_latest(); - opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; - z_owned_closure_reply_t zn_closure_reply = - z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); - z_get( - context_impl->session(), - z_loan(client_data->keyexpr), "", - z_move(zn_closure_reply), - &opts); - - return RMW_RET_OK; + return client_data->send_request(ros_request, sequence_id); } //============================================================================== @@ -1830,12 +1544,10 @@ rmw_take_response( void * ros_response, bool * taken) { + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); *taken = false; - RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, @@ -1843,69 +1555,13 @@ rmw_take_response( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_FOR_NULL_WITH_MSG( client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = client_data->pop_next_reply(); - if (latest_reply == nullptr) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - std::optional sample = latest_reply->get_sample(); - if (!sample) { - RMW_SET_ERROR_MSG("invalid reply sample"); - return RMW_RET_ERROR; - } - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(sample->payload.start)), - sample->payload.len); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!client_data->response_type_support->deserialize_ros_message( - deser.get_cdr(), - ros_response, - client_data->response_type_support_impl)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS response"); - return RMW_RET_ERROR; - } - - // Fill in the request_header - - request_header->request_id.sequence_number = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } - - request_header->source_timestamp = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); - return RMW_RET_ERROR; - } - - if (!rmw_zenoh_cpp::get_gid_from_attachment( - &sample->attachment, - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client gid from attachment"); - return RMW_RET_ERROR; - } - - request_header->received_timestamp = latest_reply->get_received_timestamp(); - - *taken = true; - - return RMW_RET_OK; + return client_data->take_response(request_header, ros_response, taken); } //============================================================================== @@ -1922,12 +1578,11 @@ rmw_client_request_publisher_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - *qos = client_data->adapted_qos_profile; + *qos = client_data->topic_info().qos_; return RMW_RET_OK; } @@ -1951,7 +1606,6 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profile) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, @@ -2004,7 +1658,6 @@ rmw_create_service( return nullptr; } - // SERVICE DATA ============================================================== rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { RMW_SET_ERROR_MSG("allocator is invalid."); @@ -2045,7 +1698,7 @@ rmw_create_service( } // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased - // Service handle will be returned in the rmw_service_t in rmw_wait + // Service handle will be returned in the rmw_services_t in rmw_wait // from which we cannot obtain ServiceData. rmw_service->data = static_cast(node_data->get_service_data(rmw_service).get()); rmw_service->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; @@ -2436,8 +2089,8 @@ check_and_attach_condition( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2597,8 +2250,8 @@ rmw_wait( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2810,12 +2463,12 @@ rmw_get_gid_for_client(const rmw_client_t * client, rmw_gid_t * gid) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(gid, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - client_data->entity->copy_gid(gid->data); + client_data->copy_gid(gid->data); return RMW_RET_OK; } @@ -2861,29 +2514,16 @@ rmw_service_server_is_available( RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(is_available, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); if (client_data == nullptr) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( "Unable to retreive client_data from client for service %s", client->service_name); return RMW_RET_INVALID_ARGUMENT; } - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), client->service_name); - return RMW_RET_INVALID_ARGUMENT; - } - return node->context->impl->graph_cache()->service_server_is_available( - client->service_name, service_type.c_str(), is_available); + client_data->topic_info(), is_available); } //============================================================================== @@ -2956,11 +2596,11 @@ rmw_client_set_on_new_response_callback( const void * user_data) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - client_data->data_callback_mgr.set_callback( - user_data, std::move(callback)); + client_data->set_on_new_response_callback( + std::move(callback), user_data); return RMW_RET_OK; } } // extern "C" From a7a173ce56c55c0143484f93811fecdf6a381f80 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Sat, 12 Oct 2024 08:07:19 +0800 Subject: [PATCH 02/10] Handle queries in flight Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 57 +++++++++++--------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 7 ++- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 12 ++++- 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 4a86b6ea..4a8b5387 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -40,6 +40,11 @@ namespace rmw_zenoh_cpp { +// A static mutex whose lifetime will extend beyond that of the internal mutex_. +// This is needed as the decrement_queries_in_flight callback that me be executed +// after ClientData is deallocated will try to lock an invalid mutex and thus lead +// to UB. This only happens in rclcpp_action/test_client. +static std::mutex num_in_flight_mutex; ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -54,6 +59,12 @@ void client_data_handler(z_owned_reply_t * reply, void * data) // See the comment about the "num_in_flight" class variable in the ClientData class for // why we need to do this. + // Note: This called could lead to UB since the ClientData * could have been deallocated + // if the query reply is received after NodeData was destructed. ie even though we do not + // erase the ClientData from NodeData's clients_ map when rmw_destroy_client is called, + // the clients_ maps would get erased when the NodeData's destructor is invoked. + // This is an edge case that should be resolved once we switch to zenoh-cpp and capture + // weaK_ptr in this callback instead. if (client_data->is_shutdown()) { return; } @@ -100,17 +111,13 @@ void client_data_drop(void * data) // See the comment about the "num_in_flight" class variable in the ClientData class for // why we need to do this. - bool queries_in_flight = false; - bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); - - if (is_shutdown) { - if (!queries_in_flight) { - // TODO(Yadunund): Do we still need this? - // RMW_TRY_DESTRUCTOR(~ClientData(), ClientData, ); - // context->options.allocator.deallocate( - // client_data, context->options.allocator.state); - } - } + // Note: This called could lead to UB since the ClientData * could have been deallocated + // if the query reply is received after NodeData was destructed. ie even though we do not + // erase the ClientData from NodeData's clients_ map when rmw_destroy_client is called, + // the clients_ maps would get erased when the NodeData's destructor is invoked. + // This is an edge case that should be resolved once we switch to zenoh-cpp and capture + // weaK_ptr in this callback instead. + client_data->decrement_queries_in_flight(); } ///============================================================================= @@ -450,7 +457,10 @@ rmw_ret_t ClientData::send_request( // See the comment about the "num_in_flight" class variable in the ClientData class for // why we need to do this. - num_in_flight_++; + { + std::lock_guard lock(num_in_flight_mutex); + num_in_flight_++; + } opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // The default timeout for a z_get query is 10 seconds and if a response is not received within @@ -523,11 +533,10 @@ bool ClientData::detach_condition_and_queue_is_empty() //============================================================================== // See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure // for the use of this method. -bool ClientData::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) +void ClientData::decrement_queries_in_flight() { - std::lock_guard lock(mutex_); - queries_in_flight = --num_in_flight_ > 0; - return is_shutdown_; + std::lock_guard lock(num_in_flight_mutex); + --num_in_flight_; } ///============================================================================= @@ -539,15 +548,6 @@ rmw_ret_t ClientData::shutdown() return ret; } - if (num_in_flight_ > 0) { - // TODO(Yadunund): Check if we need to do something. - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Client %s is shutting down while there are still queries in flight.", - entity_->topic_info().value().name_.c_str() - ); - } - // Unregister this node from the ROS graph. if (zc_liveliness_token_check(&token_)) { zc_liveliness_undeclare_token(z_move(token_)); @@ -560,6 +560,13 @@ rmw_ret_t ClientData::shutdown() return RMW_RET_OK; } +///============================================================================= +bool ClientData::query_in_flight() +{ + std::lock_guard lock(mutex_); + return num_in_flight_ > 0; +} + ///============================================================================= bool ClientData::is_shutdown() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index e592fc40..cd76686f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -90,11 +90,16 @@ class ClientData final bool detach_condition_and_queue_is_empty(); // See the comment for "num_in_flight" below on the use of this method. - bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + void decrement_queries_in_flight(); // Shutdown this ClientData. rmw_ret_t shutdown(); + // TODO(Yadunund): Remove this API once we are able to capture weak_ptr + // in the client closures to avoid the issue with queries in flight as described + // below. + bool query_in_flight(); + // Check if this ClientData is shutdown. bool is_shutdown() const; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 1b853a41..5f562d20 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -383,7 +383,17 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) void NodeData::delete_client_data(const rmw_client_t * const client) { std::lock_guard lock_guard(mutex_); - clients_.erase(client); + auto client_it = clients_.find(client); + if (client_it == clients_.end()) { + return; + } + // We shutdown the client first and only if that is successful, we deallocate + // the ClientData. This is to keep the ClientData alive in cases where + // rmw_destroy_client is invoked while there are still queries in flight. + client_it->second->shutdown(); + if (!client_it->second->query_in_flight()) { + clients_.erase(client); + } } ///============================================================================= From 5b17de6ffe760745f6c248769970d88b26705efc Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 17 Oct 2024 02:35:57 +0800 Subject: [PATCH 03/10] Address some feedback Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 5 +++-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 4a8b5387..5e128834 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -27,10 +27,11 @@ #include "attachment_helpers.hpp" #include "cdr.hpp" -#include "rmw_context_impl_s.hpp" -#include "message_type_support.hpp" +#include "liveliness_utils.hpp" #include "logging_macros.hpp" +#include "message_type_support.hpp" #include "qos.hpp" +#include "rmw_context_impl_s.hpp" #include "rcpputils/scope_exit.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 51dc4fb3..cf4c4cdd 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1403,7 +1403,7 @@ rmw_create_client( return nullptr; } - // Get the RMW the type support. + // Get the service type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { // error was already set by find_service_type_support @@ -1457,7 +1457,7 @@ rmw_create_client( rmw_client->service_name = rcutils_strdup(service_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( rmw_client->service_name, - "failed to allocate service name", + "failed to allocate service client name", return nullptr); auto free_service_name = rcpputils::make_scope_exit( [rmw_client, allocator]() { From 5e4a54ff422bb90e858700e01aa783017a97cf49 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 17 Oct 2024 02:46:34 +0800 Subject: [PATCH 04/10] Fix service client type check Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 5e128834..74421380 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -155,8 +155,8 @@ std::shared_ptr ClientData::make( // Note: Service request/response types will contain a suffix Request_ or Response_. // We remove the suffix when appending the type to the liveliness tokens for // better reusability within GraphCache. - std::string service_type = response_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Response_"); + std::string service_type = request_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Request_"); if (std::string::npos != suffix_substring_position) { service_type = service_type.substr(0, suffix_substring_position); } else { From aedffb8382c10ba2959b54fb9f3eb07f635bde96 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Sat, 19 Oct 2024 07:02:49 +0800 Subject: [PATCH 05/10] fix tracking of num in flight without UB Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 80 +++++++++++++++----- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 21 +---- 2 files changed, 61 insertions(+), 40 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 74421380..bd9a36f2 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "attachment_helpers.hpp" @@ -41,11 +42,30 @@ namespace rmw_zenoh_cpp { -// A static mutex whose lifetime will extend beyond that of the internal mutex_. -// This is needed as the decrement_queries_in_flight callback that me be executed -// after ClientData is deallocated will try to lock an invalid mutex and thus lead -// to UB. This only happens in rclcpp_action/test_client. +// rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no +// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an +// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the +// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. +// The next 2 variables are used to avoid that situation. Any time a query is initiated via +// rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the +// query reply, num_in_flight_ is decremented. +// When shutdown() is called, is_shutdown_ is set to true. +// If num_in_flight_ is 0, the data associated with this structure is freed. +// If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. +// In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query +// callback, the query callback will free up the structure. +// +// There is one case which is not handled by this, which has to do with timeouts. The query +// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never +// returns, the memory in this structure will never be freed. There isn't much we can do about +// that at this time, but we may want to consider changing the timeout so that the memory can +// eventually be freed up. +// +// TODO(Yadunund): Remove these variables once we switch to zenoh-cpp and can capture +// weak_ptr in zenoh callbacks. static std::mutex num_in_flight_mutex; +static std::unordered_map num_in_flight_map = {}; +static std::unordered_set deleted_clients = {}; ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -53,7 +73,18 @@ void client_data_handler(z_owned_reply_t * reply, void * data) if (client_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "Unable to obtain client_data_t " + "Unable to obtain client_data_t from data in client_data_handler." + ); + return; + } + + // client_data could be a dangling pointer if this callback was triggered after + // the last reference to ClientData::SharedPtr was dropped from NodeData. + std::lock_guard lock(num_in_flight_mutex); + if (deleted_clients.count(client_data) > 0) { + RMW_ZENOH_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "client_data_handler triggered for ClientData that has been deleted. Ignoring..." ); return; } @@ -118,7 +149,10 @@ void client_data_drop(void * data) // the clients_ maps would get erased when the NodeData's destructor is invoked. // This is an edge case that should be resolved once we switch to zenoh-cpp and capture // weaK_ptr in this callback instead. - client_data->decrement_queries_in_flight(); + auto num_in_flight_it = num_in_flight_map.find(client_data); + if (num_in_flight_it != num_in_flight_map.end()) { + --num_in_flight_it->second; + } } ///============================================================================= @@ -244,6 +278,10 @@ std::shared_ptr ClientData::make( free_ros_keyexpr.cancel(); free_token.cancel(); + // Erase from deleted_clients set if the memory address for the client data is reused. + num_in_flight_map.erase(client_data.get()); + deleted_clients.erase(client_data.get()); + return client_data; } @@ -263,7 +301,6 @@ ClientData::ClientData( response_type_support_(std::move(response_type_support)), wait_set_data_(nullptr), sequence_number_(1), - num_in_flight_(0), is_shutdown_(false) { // Do nothing. @@ -460,7 +497,12 @@ rmw_ret_t ClientData::send_request( // why we need to do this. { std::lock_guard lock(num_in_flight_mutex); - num_in_flight_++; + auto num_in_flight_it = num_in_flight_map.find(this); + if (num_in_flight_it == num_in_flight_map.end()) { + num_in_flight_map[this] = 0; + num_in_flight_it = num_in_flight_map.find(this); + } + num_in_flight_it->second++; } opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; @@ -498,6 +540,9 @@ ClientData::~ClientData() entity_->topic_info().value().name_.c_str() ); } + std::lock_guard lock(num_in_flight_mutex); + num_in_flight_map.erase(this); + deleted_clients.insert(this); } //============================================================================== @@ -531,15 +576,6 @@ bool ClientData::detach_condition_and_queue_is_empty() return reply_queue_.empty(); } -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure -// for the use of this method. -void ClientData::decrement_queries_in_flight() -{ - std::lock_guard lock(num_in_flight_mutex); - --num_in_flight_; -} - ///============================================================================= rmw_ret_t ClientData::shutdown() { @@ -562,10 +598,14 @@ rmw_ret_t ClientData::shutdown() } ///============================================================================= -bool ClientData::query_in_flight() +bool ClientData::query_in_flight() const { - std::lock_guard lock(mutex_); - return num_in_flight_ > 0; + std::lock_guard lock(num_in_flight_mutex); + auto query_in_flight_it = num_in_flight_map.find(this); + if (query_in_flight_it != num_in_flight_map.end()) { + return query_in_flight_it->second > 0; + } + return false; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index cd76686f..f4fb0efa 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -98,7 +98,7 @@ class ClientData final // TODO(Yadunund): Remove this API once we are able to capture weak_ptr // in the client closures to avoid the issue with queries in flight as described // below. - bool query_in_flight(); + bool query_in_flight() const; // Check if this ClientData is shutdown. bool is_shutdown() const; @@ -139,25 +139,6 @@ class ClientData final DataCallbackManager data_callback_mgr_; // Sequence number for queries. size_t sequence_number_; - // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no - // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an - // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the - // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. - // The next 2 variables are used to avoid that situation. Any time a query is initiated via - // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the - // query reply, num_in_flight_ is decremented. - // When shutdown() is called, is_shutdown_ is set to true. - // If num_in_flight_ is 0, the data associated with this structure is freed. - // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. - // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query - // callback, the query callback will free up the structure. - // - // There is one case which is not handled by this, which has to do with timeouts. The query - // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never - // returns, the memory in this structure will never be freed. There isn't much we can do about - // that at this time, but we may want to consider changing the timeout so that the memory can - // eventually be freed up. - size_t num_in_flight_; // Shutdown flag. bool is_shutdown_; }; From 00a9424ace886573993b27ba4104ddcef24c3e4c Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 31 Oct 2024 11:51:43 -0400 Subject: [PATCH 06/10] Additional fixes for yadu/raii-client. (#304) * Additional fixes for yadu/raii-client. Signed-off-by: Chris Lalancette * Apply suggestions from code review Co-authored-by: yadunund Signed-off-by: Chris Lalancette * Fixups. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette Co-authored-by: yadunund --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 116 +++++++++++-------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 8 +- 2 files changed, 74 insertions(+), 50 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index bd9a36f2..668a2b46 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -46,14 +47,15 @@ namespace rmw_zenoh_cpp // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. -// The next 2 variables are used to avoid that situation. Any time a query is initiated via -// rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the -// query reply, num_in_flight_ is decremented. -// When shutdown() is called, is_shutdown_ is set to true. -// If num_in_flight_ is 0, the data associated with this structure is freed. -// If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. -// In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query -// callback, the query callback will free up the structure. +// The next 3 global variables are used to avoid that situation. Any time a query is initiated via +// rmw_send_request(), num_in_flight_ is incremented. When Zenoh calls the callback for the +// query drop, num_in_flight_map->second is decremented. +// When ClientData is destroyed, it checks to see if there are things in flight. If there are, +// it leaves this ClientData pointer both in the num_in_flight_map and the deleted_clients map. +// When the client_data_handler() is called on these destroyed objects, it knows that it cannot +// dereference the data anymore, and it gets out early. When client_data_drop() is called, it +// decrements num_in_flight_map->second, and if that drops to zero, drops the pointer address +// completely from deleted_clients. // // There is one case which is not handled by this, which has to do with timeouts. The query // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never @@ -66,6 +68,7 @@ namespace rmw_zenoh_cpp static std::mutex num_in_flight_mutex; static std::unordered_map num_in_flight_map = {}; static std::unordered_set deleted_clients = {}; + ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -78,8 +81,6 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } - // client_data could be a dangling pointer if this callback was triggered after - // the last reference to ClientData::SharedPtr was dropped from NodeData. std::lock_guard lock(num_in_flight_mutex); if (deleted_clients.count(client_data) > 0) { RMW_ZENOH_LOG_INFO_NAMED( @@ -91,12 +92,6 @@ void client_data_handler(z_owned_reply_t * reply, void * data) // See the comment about the "num_in_flight" class variable in the ClientData class for // why we need to do this. - // Note: This called could lead to UB since the ClientData * could have been deallocated - // if the query reply is received after NodeData was destructed. ie even though we do not - // erase the ClientData from NodeData's clients_ map when rmw_destroy_client is called, - // the clients_ maps would get erased when the NodeData's destructor is invoked. - // This is an edge case that should be resolved once we switch to zenoh-cpp and capture - // weaK_ptr in this callback instead. if (client_data->is_shutdown()) { return; } @@ -143,15 +138,20 @@ void client_data_drop(void * data) // See the comment about the "num_in_flight" class variable in the ClientData class for // why we need to do this. - // Note: This called could lead to UB since the ClientData * could have been deallocated - // if the query reply is received after NodeData was destructed. ie even though we do not - // erase the ClientData from NodeData's clients_ map when rmw_destroy_client is called, - // the clients_ maps would get erased when the NodeData's destructor is invoked. - // This is an edge case that should be resolved once we switch to zenoh-cpp and capture - // weaK_ptr in this callback instead. + std::lock_guard lock(num_in_flight_mutex); auto num_in_flight_it = num_in_flight_map.find(client_data); - if (num_in_flight_it != num_in_flight_map.end()) { - --num_in_flight_it->second; + if (num_in_flight_it == num_in_flight_map.end()) { + // This should never happen + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to find object in num_in_flight_map. Report this bug." + ); + return; + } + + --num_in_flight_it->second; + if (num_in_flight_it->second == 0 && deleted_clients.count(client_data) > 0) { + deleted_clients.erase(client_data); } } @@ -183,8 +183,8 @@ std::shared_ptr ClientData::make( service_members->request_members_->data); auto response_members = static_cast( service_members->response_members_->data); - auto request_type_support = std::make_unique(service_members); - auto response_type_support = std::make_unique(service_members); + auto request_type_support = std::make_shared(service_members); + auto response_type_support = std::make_shared(service_members); // Note: Service request/response types will contain a suffix Request_ or Response_. // We remove the suffix when appending the type to the liveliness tokens for @@ -238,15 +238,21 @@ std::shared_ptr ClientData::make( return nullptr; } - auto client_data = std::shared_ptr( - new ClientData{ - node, - std::move(entity), - request_members, - response_members, - std::move(request_type_support), - std::move(response_type_support) - }); + std::lock_guard lock(num_in_flight_mutex); + std::vector> duplicate_pointers; + std::shared_ptr client_data; + do { + client_data = std::shared_ptr( + new ClientData{ + node, + entity, + request_members, + response_members, + request_type_support, + response_type_support + }); + duplicate_pointers.push_back(client_data); + } while (deleted_clients.count(client_data.get()) > 0); client_data->keyexpr_ = z_keyexpr_new(client_data->entity_->topic_info().value().topic_keyexpr_.c_str()); @@ -278,9 +284,7 @@ std::shared_ptr ClientData::make( free_ros_keyexpr.cancel(); free_token.cancel(); - // Erase from deleted_clients set if the memory address for the client data is reused. - num_in_flight_map.erase(client_data.get()); - deleted_clients.erase(client_data.get()); + num_in_flight_map[client_data.get()] = 0; return client_data; } @@ -291,14 +295,14 @@ ClientData::ClientData( std::shared_ptr entity, const void * request_type_support_impl, const void * response_type_support_impl, - std::unique_ptr request_type_support, - std::unique_ptr response_type_support) + std::shared_ptr request_type_support, + std::shared_ptr response_type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), - request_type_support_(std::move(request_type_support)), - response_type_support_(std::move(response_type_support)), + request_type_support_(request_type_support), + response_type_support_(response_type_support), wait_set_data_(nullptr), sequence_number_(1), is_shutdown_(false) @@ -499,8 +503,9 @@ rmw_ret_t ClientData::send_request( std::lock_guard lock(num_in_flight_mutex); auto num_in_flight_it = num_in_flight_map.find(this); if (num_in_flight_it == num_in_flight_map.end()) { - num_in_flight_map[this] = 0; - num_in_flight_it = num_in_flight_map.find(this); + // This should never happen + RMW_SET_ERROR_MSG("failed to find object in num_in_flight_map"); + return RMW_RET_ERROR; } num_in_flight_it->second++; } @@ -540,9 +545,28 @@ ClientData::~ClientData() entity_->topic_info().value().name_.c_str() ); } + std::lock_guard lock(num_in_flight_mutex); - num_in_flight_map.erase(this); - deleted_clients.insert(this); + auto num_in_flight_it = num_in_flight_map.find(this); + if (num_in_flight_it != num_in_flight_map.end()) { + if (num_in_flight_it->second == 0) { + // If there is nothing in flight, we can remove this from the map + // with no further considerations. + num_in_flight_map.erase(this); + } else { + // Since there is still something in flight, we need to just add + // it to the deleted_clients; it will be deleted when the last + // outstanding query finishes. + deleted_clients.insert(this); + } + } else { + // This should never happen + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error finding client /%s in num_in_flight_map. Report this bug.", + entity_->topic_info().value().name_.c_str() + ); + } } //============================================================================== diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index f4fb0efa..0eb95a61 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -113,8 +113,8 @@ class ClientData final std::shared_ptr entity, const void * request_type_support_impl, const void * response_type_support_impl, - std::unique_ptr request_type_support, - std::unique_ptr response_type_support); + std::shared_ptr request_type_support, + std::shared_ptr response_type_support); // Internal mutex. mutable std::mutex mutex_; @@ -129,8 +129,8 @@ class ClientData final // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; - std::unique_ptr request_type_support_; - std::unique_ptr response_type_support_; + std::shared_ptr request_type_support_; + std::shared_ptr response_type_support_; // Deque to store the replies in the order they arrive. std::deque> reply_queue_; // Wait set data. From 5873d703c14dd28face5a53a7f0099afb9772e83 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Sun, 3 Nov 2024 21:57:29 -0800 Subject: [PATCH 07/10] Fix -Wnon-pod-varargs Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 668a2b46..5e11872c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -234,7 +234,7 @@ std::shared_ptr ClientData::make( RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to generate keyexpr for liveliness token for the client %s.", - service_name); + service_name.c_str()); return nullptr; } From 8a4ea55369d83ced12551feb6045ebd81a3f80ca Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 8 Nov 2024 15:23:07 +0000 Subject: [PATCH 08/10] Make PublisherData::copy_gid have the same signatures as others. This just makes it more consistent with the rest of the APIs. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp | 4 ++-- rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp | 2 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 9b2777c8..b2ea8235 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -421,10 +421,10 @@ liveliness::TopicInfo PublisherData::topic_info() const } ///============================================================================= -void PublisherData::copy_gid(rmw_gid_t * gid) const +void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { std::lock_guard lock(mutex_); - entity_->copy_gid(gid->data); + entity_->copy_gid(out_gid); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 2a5f9240..7b124cc3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -68,7 +68,7 @@ class PublisherData final liveliness::TopicInfo topic_info() const; // Copy the GID of this PublisherData into an rmw_gid_t. - void copy_gid(rmw_gid_t * gid) const; + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; // Returns true if liveliness token is still valid. bool liveliness_is_valid() const; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index cf4c4cdd..eebe1c4b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2451,7 +2451,7 @@ rmw_get_gid_for_publisher(const rmw_publisher_t * publisher, rmw_gid_t * gid) RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - pub_data->copy_gid(gid); + pub_data->copy_gid(gid->data); return RMW_RET_OK; } From bb699414e132a28ef04623411dbeb325b3832360 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 12 Nov 2024 05:53:06 +0800 Subject: [PATCH 09/10] Add api to init client Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 68 ++++++++++++-------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 7 +- 2 files changed, 46 insertions(+), 29 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 5e11872c..974458ca 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -254,36 +254,11 @@ std::shared_ptr ClientData::make( duplicate_pointers.push_back(client_data); } while (deleted_clients.count(client_data.get()) > 0); - client_data->keyexpr_ = - z_keyexpr_new(client_data->entity_->topic_info().value().topic_keyexpr_.c_str()); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { - z_drop(z_move(client_data->keyexpr_)); - }); - if (!z_check(z_loan(client_data->keyexpr_))) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - - client_data->token_ = zc_liveliness_declare_token( - session, - z_keyexpr(client_data->entity_->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [client_data]() { - z_drop(z_move(client_data->token_)); - }); - if (!z_check(client_data->token_)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the client."); + if (!client_data->init(session)) { + // init() already set the error. return nullptr; } - free_ros_keyexpr.cancel(); - free_token.cancel(); - num_in_flight_map[client_data.get()] = 0; return client_data; @@ -310,6 +285,42 @@ ClientData::ClientData( // Do nothing. } +///============================================================================= +bool ClientData::init(z_session_t session) +{ + this->keyexpr_ = + z_keyexpr_new(this->entity_->topic_info().value().topic_keyexpr_.c_str()); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->keyexpr_)); + }); + if (!z_check(z_loan(this->keyexpr_))) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return false; + } + + this->token_ = zc_liveliness_declare_token( + session, + z_keyexpr(this->entity_->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->token_)); + }); + if (!z_check(this->token_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return false; + } + + free_ros_keyexpr.cancel(); + free_token.cancel(); + + return true; +} + ///============================================================================= liveliness::TopicInfo ClientData::topic_info() const { @@ -605,7 +616,7 @@ rmw_ret_t ClientData::shutdown() { rmw_ret_t ret = RMW_RET_OK; std::lock_guard lock(mutex_); - if (is_shutdown_) { + if (is_shutdown_ || !initialized_) { return ret; } @@ -618,6 +629,7 @@ rmw_ret_t ClientData::shutdown() } is_shutdown_ = true; + initialized_ = false; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index 0eb95a61..4357dec2 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -42,7 +42,7 @@ namespace rmw_zenoh_cpp { ///============================================================================= -class ClientData final +class ClientData final : public std::enable_shared_from_this { public: // Make a shared_ptr of ClientData. @@ -116,6 +116,9 @@ class ClientData final std::shared_ptr request_type_support, std::shared_ptr response_type_support); + // Initialize the Zenoh objects for this entity. + bool init(z_session_t session); + // Internal mutex. mutable std::mutex mutex_; // The parent node. @@ -141,6 +144,8 @@ class ClientData final size_t sequence_number_; // Shutdown flag. bool is_shutdown_; + // Whether the object has ever successfully been initialized. + bool initialized_; }; using ClientDataPtr = std::shared_ptr; using ClientDataConstPtr = std::shared_ptr; From 821f4e9f9fbd6226a83b8cd6c93a9936c47d5639 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 12 Nov 2024 13:42:50 +0000 Subject: [PATCH 10/10] Vastly simplify handling of in-flight ClientData. We basically use the pointer as a key to lookup the shared_ptr, and then always use the shared_ptr. That ensures that even if the pointer was dropped from the node while the client callback is operating, it will still be valid and won't be destroyed until that method returns. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 167 ++++--------------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 5 - rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 9 +- 3 files changed, 39 insertions(+), 142 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 974458ca..8c851a38 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -41,38 +40,17 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" -namespace rmw_zenoh_cpp +namespace { -// rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no -// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an -// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the -// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. -// The next 3 global variables are used to avoid that situation. Any time a query is initiated via -// rmw_send_request(), num_in_flight_ is incremented. When Zenoh calls the callback for the -// query drop, num_in_flight_map->second is decremented. -// When ClientData is destroyed, it checks to see if there are things in flight. If there are, -// it leaves this ClientData pointer both in the num_in_flight_map and the deleted_clients map. -// When the client_data_handler() is called on these destroyed objects, it knows that it cannot -// dereference the data anymore, and it gets out early. When client_data_drop() is called, it -// decrements num_in_flight_map->second, and if that drops to zero, drops the pointer address -// completely from deleted_clients. -// -// There is one case which is not handled by this, which has to do with timeouts. The query -// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never -// returns, the memory in this structure will never be freed. There isn't much we can do about -// that at this time, but we may want to consider changing the timeout so that the memory can -// eventually be freed up. -// -// TODO(Yadunund): Remove these variables once we switch to zenoh-cpp and can capture -// weak_ptr in zenoh callbacks. -static std::mutex num_in_flight_mutex; -static std::unordered_map num_in_flight_map = {}; -static std::unordered_set deleted_clients = {}; + +std::mutex client_data_ptr_to_shared_ptr_map_mutex; +std::unordered_map> client_data_ptr_to_shared_ptr_map; ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * data) { - auto client_data = static_cast(data); + auto client_data = static_cast(data); if (client_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -81,18 +59,16 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } - std::lock_guard lock(num_in_flight_mutex); - if (deleted_clients.count(client_data) > 0) { - RMW_ZENOH_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "client_data_handler triggered for ClientData that has been deleted. Ignoring..." - ); - return; + std::shared_ptr client_data_shared_ptr{nullptr}; + { + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + if (client_data_ptr_to_shared_ptr_map.count(client_data) == 0) { + return; + } + client_data_shared_ptr = client_data_ptr_to_shared_ptr_map[client_data]; } - // See the comment about the "num_in_flight" class variable in the ClientData class for - // why we need to do this. - if (client_data->is_shutdown()) { + if (client_data_shared_ptr->is_shutdown()) { return; } @@ -118,43 +94,17 @@ void client_data_handler(z_owned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - client_data->add_new_reply(std::make_unique(reply, received_timestamp)); + client_data_shared_ptr->add_new_reply( + std::make_unique(reply, received_timestamp)); // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } -///============================================================================= -void client_data_drop(void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the ClientData class for - // why we need to do this. - std::lock_guard lock(num_in_flight_mutex); - auto num_in_flight_it = num_in_flight_map.find(client_data); - if (num_in_flight_it == num_in_flight_map.end()) { - // This should never happen - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to find object in num_in_flight_map. Report this bug." - ); - return; - } - - --num_in_flight_it->second; - if (num_in_flight_it->second == 0 && deleted_clients.count(client_data) > 0) { - deleted_clients.erase(client_data); - } -} +} // namespace +namespace rmw_zenoh_cpp +{ ///============================================================================= std::shared_ptr ClientData::make( z_session_t session, @@ -238,28 +188,23 @@ std::shared_ptr ClientData::make( return nullptr; } - std::lock_guard lock(num_in_flight_mutex); - std::vector> duplicate_pointers; - std::shared_ptr client_data; - do { - client_data = std::shared_ptr( - new ClientData{ - node, - entity, - request_members, - response_members, - request_type_support, - response_type_support - }); - duplicate_pointers.push_back(client_data); - } while (deleted_clients.count(client_data.get()) > 0); + std::shared_ptr client_data = std::shared_ptr( + new ClientData{ + node, + entity, + request_members, + response_members, + request_type_support, + response_type_support + }); if (!client_data->init(session)) { // init() already set the error. return nullptr; } - num_in_flight_map[client_data.get()] = 0; + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + client_data_ptr_to_shared_ptr_map.emplace(client_data.get(), client_data); return client_data; } @@ -508,18 +453,6 @@ rmw_ret_t ClientData::send_request( z_bytes_map_drop(z_move(map)); }); - // See the comment about the "num_in_flight" class variable in the ClientData class for - // why we need to do this. - { - std::lock_guard lock(num_in_flight_mutex); - auto num_in_flight_it = num_in_flight_map.find(this); - if (num_in_flight_it == num_in_flight_map.end()) { - // This should never happen - RMW_SET_ERROR_MSG("failed to find object in num_in_flight_map"); - return RMW_RET_ERROR; - } - num_in_flight_it->second++; - } opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // The default timeout for a z_get query is 10 seconds and if a response is not received within @@ -535,7 +468,7 @@ rmw_ret_t ClientData::send_request( // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. z_owned_closure_reply_t zn_closure_reply = - z_closure(client_data_handler, client_data_drop, this); + z_closure(client_data_handler, nullptr, this); z_get( context_impl->session(), z_loan(keyexpr_), "", @@ -548,6 +481,11 @@ rmw_ret_t ClientData::send_request( ///============================================================================= ClientData::~ClientData() { + { + std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); + client_data_ptr_to_shared_ptr_map.erase(this); + } + const rmw_ret_t ret = this->shutdown(); if (ret != RMW_RET_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -556,28 +494,6 @@ ClientData::~ClientData() entity_->topic_info().value().name_.c_str() ); } - - std::lock_guard lock(num_in_flight_mutex); - auto num_in_flight_it = num_in_flight_map.find(this); - if (num_in_flight_it != num_in_flight_map.end()) { - if (num_in_flight_it->second == 0) { - // If there is nothing in flight, we can remove this from the map - // with no further considerations. - num_in_flight_map.erase(this); - } else { - // Since there is still something in flight, we need to just add - // it to the deleted_clients; it will be deleted when the last - // outstanding query finishes. - deleted_clients.insert(this); - } - } else { - // This should never happen - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Error finding client /%s in num_in_flight_map. Report this bug.", - entity_->topic_info().value().name_.c_str() - ); - } } //============================================================================== @@ -633,17 +549,6 @@ rmw_ret_t ClientData::shutdown() return RMW_RET_OK; } -///============================================================================= -bool ClientData::query_in_flight() const -{ - std::lock_guard lock(num_in_flight_mutex); - auto query_in_flight_it = num_in_flight_map.find(this); - if (query_in_flight_it != num_in_flight_map.end()) { - return query_in_flight_it->second > 0; - } - return false; -} - ///============================================================================= bool ClientData::is_shutdown() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index 4357dec2..b4c56775 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -95,11 +95,6 @@ class ClientData final : public std::enable_shared_from_this // Shutdown this ClientData. rmw_ret_t shutdown(); - // TODO(Yadunund): Remove this API once we are able to capture weak_ptr - // in the client closures to avoid the issue with queries in flight as described - // below. - bool query_in_flight() const; - // Check if this ClientData is shutdown. bool is_shutdown() const; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 5f562d20..f3430280 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -387,13 +387,10 @@ void NodeData::delete_client_data(const rmw_client_t * const client) if (client_it == clients_.end()) { return; } - // We shutdown the client first and only if that is successful, we deallocate - // the ClientData. This is to keep the ClientData alive in cases where - // rmw_destroy_client is invoked while there are still queries in flight. + // Shutdown the client, then erase it. The code in rmw_client_data.cpp is careful about keeping + // it alive as long as necessary. client_it->second->shutdown(); - if (!client_it->second->query_in_flight()) { - clients_.erase(client); - } + clients_.erase(client); } ///=============================================================================