From 61dafb0f09e2d71382b8ef8c5d119c703dddaeac Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 10 Oct 2024 06:16:41 +0800 Subject: [PATCH] Make ClientData thread-safe Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 2 +- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 9 +- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 3 +- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 563 ++++++++++++++++++ rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 165 +++++ .../src/detail/rmw_context_impl_s.hpp | 2 - rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 221 ------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 132 ---- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 79 +++ rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 18 + rmw_zenoh_cpp/src/rmw_event.cpp | 1 - rmw_zenoh_cpp/src/rmw_init.cpp | 1 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 517 +++------------- 13 files changed, 911 insertions(+), 802 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_client_data.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_client_data.hpp delete mode 100644 rmw_zenoh_cpp/src/detail/rmw_data_types.cpp delete mode 100644 rmw_zenoh_cpp/src/detail/rmw_data_types.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 1442541b..cb7fe64f 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -38,8 +38,8 @@ add_library(rmw_zenoh_cpp SHARED src/detail/logging.cpp src/detail/message_type_support.cpp src/detail/qos.cpp + src/detail/rmw_client_data.cpp src/detail/rmw_context_impl_s.cpp - src/detail/rmw_data_types.cpp src/detail/rmw_publisher_data.cpp src/detail/rmw_node_data.cpp src/detail/rmw_service_data.cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index f21de3d1..8f446692 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -36,7 +36,6 @@ #include "graph_cache.hpp" #include "logging_macros.hpp" -#include "rmw_data_types.hpp" namespace rmw_zenoh_cpp { @@ -1216,15 +1215,15 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( ///============================================================================= rmw_ret_t GraphCache::service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const { *is_available = false; std::lock_guard lock(graph_mutex_); - GraphNode::TopicMap::const_iterator service_it = graph_services_.find(service_name); + GraphNode::TopicMap::const_iterator service_it = graph_services_.find(client_topic_info.name_); if (service_it != graph_services_.end()) { - GraphNode::TopicTypeMap::const_iterator type_it = service_it->second.find(service_type); + GraphNode::TopicTypeMap::const_iterator type_it = + service_it->second.find(client_topic_info.type_); if (type_it != service_it->second.end()) { for (const auto & [_, topic_data] : type_it->second) { if (topic_data->subs_.size() > 0) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 4864d678..9385548e 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -172,8 +172,7 @@ class GraphCache final rmw_topic_endpoint_info_array_t * endpoints_info) const; rmw_ret_t service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const; /// Set a qos event callback for an entity from the current session. diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp new file mode 100644 index 00000000..83f5d503 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -0,0 +1,563 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_client_data.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "attachment_helpers.hpp" +#include "cdr.hpp" +#include "rmw_context_impl_s.hpp" +#include "message_type_support.hpp" +#include "logging_macros.hpp" +#include "qos.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +void client_data_handler(z_owned_reply_t * reply, void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + // See the comment about the "num_in_flight" class variable in the ClientData class for + // why we need to do this. + if (client_data->is_shutdown()) { + return; + } + + if (!z_reply_check(reply)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_check returned False" + ); + return; + } + if (!z_reply_is_ok(reply)) { + z_value_t err = z_reply_err(reply); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", + client_data->topic_info().topic_keyexpr_.c_str(), + (int)err.payload.len, + err.payload.start); + + return; + } + + client_data->add_new_reply(std::make_unique(reply)); + // Since we took ownership of the reply, null it out here + *reply = z_reply_null(); +} + +///============================================================================= +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + // See the comment about the "num_in_flight" class variable in the ClientData class for + // why we need to do this. + bool queries_in_flight = false; + bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); + + if (is_shutdown) { + if (!queries_in_flight) { + // TODO(Yadunund): Do we still need this? + // RMW_TRY_DESTRUCTOR(~ClientData(), ClientData, ); + // context->options.allocator.deallocate( + // client_data, context->options.allocator.state); + } + } +} + +///============================================================================= +std::shared_ptr ClientData::make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + // Adapt any 'best available' QoS options + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + nullptr, nullptr, &adapted_qos_profile, nullptr); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + auto request_type_support = std::make_unique(service_members); + auto response_type_support = std::make_unique(service_members); + + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = response_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Response_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), service_name.c_str()); + return nullptr; + } + + // Convert the type hash to a string so that it can be included in the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + std::size_t domain_id = node_info.domain_id_; + auto entity = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(service_id), + liveliness::EntityType::Client, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + service_name, + std::move(service_type), + type_hash_c_str, + std::move(adapted_qos_profile)} + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the client %s.", + service_name); + return nullptr; + } + + auto client_data = std::shared_ptr( + new ClientData{ + node, + std::move(entity), + request_members, + response_members, + std::move(request_type_support), + std::move(response_type_support) + }); + + client_data->keyexpr_ = + z_keyexpr_new(client_data->entity_->topic_info().value().topic_keyexpr_.c_str()); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [client_data]() { + z_drop(z_move(client_data->keyexpr_)); + }); + if (!z_check(z_loan(client_data->keyexpr_))) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + + client_data->token_ = zc_liveliness_declare_token( + session, + z_keyexpr(client_data->entity_->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [client_data]() { + z_drop(z_move(client_data->token_)); + }); + if (!z_check(client_data->token_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return nullptr; + } + + free_ros_keyexpr.cancel(); + free_token.cancel(); + + return client_data; +} + +///============================================================================= +ClientData::ClientData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::unique_ptr request_type_support, + std::unique_ptr response_type_support) +: rmw_node_(rmw_node), + entity_(std::move(entity)), + request_type_support_impl_(request_type_support_impl), + response_type_support_impl_(response_type_support_impl), + request_type_support_(std::move(request_type_support)), + response_type_support_(std::move(response_type_support)), + wait_set_data_(nullptr), + sequence_number_(1), + num_in_flight_(0), + is_shutdown_(false) +{ + generate_random_gid(gid_); +} + +///============================================================================= +liveliness::TopicInfo ClientData::topic_info() const +{ + std::lock_guard lock(mutex_); + return entity_->topic_info().value(); +} + +///============================================================================= +bool ClientData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + return zc_liveliness_token_check(&token_); +} + +///============================================================================= +void ClientData::copy_gid(rmw_gid_t * gid) const +{ + std::lock_guard lock(mutex_); + memcpy(gid->data, gid_, RMW_GID_STORAGE_SIZE); +} + +///============================================================================= +void ClientData::add_new_reply(std::unique_ptr reply) +{ + std::lock_guard lock(mutex_); + const rmw_qos_profile_t adapted_qos_profile = + entity_->topic_info().value().qos_; + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + reply_queue_.size() >= adapted_qos_profile.depth) + { + // Log warning if message is discarded due to hitting the queue depth + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(keyexpr_)); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Query queue depth of %ld reached, discarding oldest Query " + "for client for %s", + adapted_qos_profile.depth, + z_loan(keystr)); + z_drop(z_move(keystr)); + reply_queue_.pop_front(); + } + reply_queue_.emplace_back(std::move(reply)); + + // Since we added new data, trigger user callback and guard condition if they are available + data_callback_mgr_.trigger_callback(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); + } +} + +///============================================================================= +rmw_ret_t ClientData::take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken) +{ + std::lock_guard lock(mutex_); + *taken = false; + + if (is_shutdown_ || reply_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + std::unique_ptr latest_reply = std::move(reply_queue_.front()); + reply_queue_.pop_front(); + + std::optional sample = latest_reply->get_sample(); + if (!sample) { + RMW_SET_ERROR_MSG("invalid reply sample"); + return RMW_RET_ERROR; + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(sample->payload.start)), + sample->payload.len); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!response_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_response, + response_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } + + // Fill in the request_header + request_header->request_id.sequence_number = + rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); + return RMW_RET_ERROR; + } + request_header->source_timestamp = + rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + if (!rmw_zenoh_cpp::get_gid_from_attachment( + &sample->attachment, + request_header->request_id.writer_guid)) + { + RMW_SET_ERROR_MSG("Could not get client gid from attachment"); + return RMW_RET_ERROR; + } + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + request_header->received_timestamp = now_ns.count(); + + *taken = true; + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t ClientData::send_request( + const void * ros_request, + int64_t * sequence_id) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + rmw_context_impl_s * context_impl = static_cast(rmw_node_->context->impl); + if (context_impl == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } + + size_t max_data_length = ( + request_type_support_->get_estimated_serialized_size( + ros_request, request_type_support_impl_)); + + // Init serialized message byte array + char * request_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!request_bytes) { + RMW_SET_ERROR_MSG("failed allocate request message bytes"); + return RMW_RET_ERROR; + } + auto always_free_request_bytes = rcpputils::make_scope_exit( + [request_bytes, allocator]() { + allocator->deallocate(request_bytes, allocator->state); + }); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); + // Object that serializes the data + Cdr ser(fastbuffer); + if (!request_type_support_->serialize_ros_message( + ros_request, + ser.get_cdr(), + request_type_support_impl_)) + { + return RMW_RET_ERROR; + } + size_t data_length = ser.get_serialized_data_length(); + *sequence_id = sequence_number_++; + + // Send request + z_get_options_t opts = z_get_options_default(); + z_owned_bytes_map_t map = create_map_and_set_sequence_num( + *sequence_id, + [this](z_owned_bytes_map_t * map, const char * key) + { + z_bytes_t gid_bytes; + gid_bytes.len = RMW_GID_STORAGE_SIZE; + gid_bytes.start = gid_; + z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); + }); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto always_free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + // See the comment about the "num_in_flight" class variable in the ClientData class for + // why we need to do this. + num_in_flight_++; + opts.attachment = z_bytes_map_as_attachment(&map); + opts.target = Z_QUERY_TARGET_ALL_COMPLETE; + // The default timeout for a z_get query is 10 seconds and if a response is not received within + // this window, the queryable will return an invalid reply. However, it is common for actions, + // which are implemented using services, to take an extended duration to complete. Hence, we set + // the timeout_ms to the largest supported value to account for most realistic scenarios. + opts.timeout_ms = std::numeric_limits::max(); + // Latest consolidation guarantees unicity of replies for the same key expression, + // which optimizes bandwidth. The default is "None", which imples replies may come in any order + // and any number. + opts.consolidation = z_query_consolidation_latest(); + opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; + // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, + // capture shared_from_this() instead of this. + z_owned_closure_reply_t zn_closure_reply = + z_closure(client_data_handler, client_data_drop, this); + z_get( + context_impl->session(), + z_loan(keyexpr_), "", + z_move(zn_closure_reply), + &opts); + + return RMW_RET_OK; +} + +///============================================================================= +ClientData::~ClientData() +{ + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing client /%s.", + entity_->topic_info().value().name_.c_str() + ); + } +} + +//============================================================================== +void ClientData::set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data) +{ + std::lock_guard lock(mutex_); + data_callback_mgr_.set_callback(user_data, std::move(callback)); +} + +///============================================================================= +bool ClientData::queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data) +{ + std::lock_guard lock(mutex_); + if (!reply_queue_.empty()) { + return true; + } + wait_set_data_ = wait_set_data; + + return false; +} + +///============================================================================= +bool ClientData::detach_condition_and_queue_is_empty() +{ + std::lock_guard lock(mutex_); + wait_set_data_ = nullptr; + + return reply_queue_.empty(); +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure +// for the use of this method. +bool ClientData::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) +{ + std::lock_guard lock(mutex_); + queries_in_flight = --num_in_flight_ > 0; + return is_shutdown_; +} + +///============================================================================= +rmw_ret_t ClientData::shutdown() +{ + rmw_ret_t ret = RMW_RET_OK; + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return ret; + } + + if (num_in_flight_ > 0) { + // TODO(Yadunund): Check if we need to do something. + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Client %s is shutting down while there are still queries in flight.", + entity_->topic_info().value().name_.c_str() + ); + } + + // Unregister this node from the ROS graph. + if (zc_liveliness_token_check(&token_)) { + zc_liveliness_undeclare_token(z_move(token_)); + } + if (z_check(z_loan(keyexpr_))) { + z_drop(z_move(keyexpr_)); + } + + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +bool ClientData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp new file mode 100644 index 00000000..86a0394e --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -0,0 +1,165 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_CLIENT_DATA_HPP_ +#define DETAIL__RMW_CLIENT_DATA_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.hpp" +#include "liveliness_utils.hpp" +#include "message_type_support.hpp" +#include "service_type_support.hpp" +#include "type_support_common.hpp" +#include "zenoh_utils.hpp" + +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" +#include "rmw/ret_types.h" + +namespace rmw_zenoh_cpp +{ + +///============================================================================= +class ClientData final +{ +public: + // Make a shared_ptr of ClientData. + static std::shared_ptr make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Get a copy of the TopicInfo of this ClientData. + liveliness::TopicInfo topic_info() const; + + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + + // Copy the GID of this ClientData into an rmw_gid_t. + void copy_gid(rmw_gid_t * gid) const; + + // Add a new ZenohReply to the queue. + void add_new_reply(std::unique_ptr reply); + + // Take a ROS service response. + rmw_ret_t take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken); + + // Send a service request. + rmw_ret_t send_request( + const void * ros_request, + int64_t * sequence_id); + + void set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data); + + // rmw_wait helpers. + bool queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data); + + bool detach_condition_and_queue_is_empty(); + + // See the comment for "num_in_flight" below on the use of this method. + bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + + // Shutdown this ClientData. + rmw_ret_t shutdown(); + + // Check if this ClientData is shutdown. + bool is_shutdown() const; + + // Destructor. + ~ClientData(); + +private: + // Constructor. + ClientData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::unique_ptr request_type_support, + std::unique_ptr response_type_support); + + // Internal mutex. + mutable std::mutex mutex_; + // The parent node. + const rmw_node_t * rmw_node_; + // The Entity generated for the service. + std::shared_ptr entity_; + // The GID for this client. + uint8_t gid_[RMW_GID_STORAGE_SIZE]; + // An owned keyexpression. + z_owned_keyexpr_t keyexpr_; + // Liveliness token for the service. + zc_owned_liveliness_token_t token_; + // Type support fields. + const void * request_type_support_impl_; + const void * response_type_support_impl_; + std::unique_ptr request_type_support_; + std::unique_ptr response_type_support_; + // Deque to store the replies in the order they arrive. + std::deque> reply_queue_; + // Wait set data. + rmw_wait_set_data_t * wait_set_data_; + // Data callback manager. + DataCallbackManager data_callback_mgr_; + // Sequence number for queries. + size_t sequence_number_; + // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no + // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an + // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the + // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. + // The next 2 variables are used to avoid that situation. Any time a query is initiated via + // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the + // query reply, num_in_flight_ is decremented. + // When shutdown() is called, is_shutdown_ is set to true. + // If num_in_flight_ is 0, the data associated with this structure is freed. + // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. + // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query + // callback, the query callback will free up the structure. + // + // There is one case which is not handled by this, which has to do with timeouts. The query + // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never + // returns, the memory in this structure will never be freed. There isn't much we can do about + // that at this time, but we may want to consider changing the timeout so that the memory can + // eventually be freed up. + size_t num_in_flight_; + // Shutdown flag. + bool is_shutdown_; +}; +using ClientDataPtr = std::shared_ptr; +using ClientDataConstPtr = std::shared_ptr; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_CLIENT_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 884056e3..32551a82 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -50,8 +50,6 @@ class rmw_context_impl_s final std::string enclave() const; // Loan the Zenoh session. - // TODO(Yadunund): Remove this API once rmw_context_impl_s is updated to - // create other Zenoh objects. z_session_t session() const; // Get a reference to the shm_manager. diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp deleted file mode 100644 index c1e56bfd..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include - -#include "liveliness_utils.hpp" -#include "logging_macros.hpp" - -#include "rcpputils/scope_exit.hpp" - -#include "rmw/error_handling.h" -#include "rmw/impl/cpp/macros.hpp" - -#include "attachment_helpers.hpp" -#include "rmw_data_types.hpp" - -///============================================================================= -namespace rmw_zenoh_cpp -{ -///============================================================================= -void rmw_client_data_t::notify() -{ - std::lock_guard lock(condition_mutex_); - if (wait_set_data_ != nullptr) { - std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); - wait_set_data_->triggered = true; - wait_set_data_->condition_variable.notify_one(); - } -} - -///============================================================================= -void rmw_client_data_t::add_new_reply(std::unique_ptr reply) -{ - std::lock_guard lock(reply_queue_mutex_); - if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && - reply_queue_.size() >= adapted_qos_profile.depth) - { - // Log warning if message is discarded due to hitting the queue depth - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Reply queue depth of %ld reached, discarding oldest reply " - "for client for %s", - adapted_qos_profile.depth, - z_loan(keystr)); - z_drop(z_move(keystr)); - reply_queue_.pop_front(); - } - reply_queue_.emplace_back(std::move(reply)); - - // Since we added new data, trigger user callback and guard condition if they are available - data_callback_mgr.trigger_callback(); - notify(); -} - -///============================================================================= -bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) -{ - std::lock_guard lock(condition_mutex_); - if (!reply_queue_.empty()) { - return true; - } - wait_set_data_ = wait_set_data; - - return false; -} - -///============================================================================= -bool rmw_client_data_t::detach_condition_and_queue_is_empty() -{ - std::lock_guard lock(condition_mutex_); - wait_set_data_ = nullptr; - - return reply_queue_.empty(); -} - -///============================================================================= -std::unique_ptr rmw_client_data_t::pop_next_reply() -{ - std::lock_guard lock(reply_queue_mutex_); - - if (reply_queue_.empty()) { - return nullptr; - } - - std::unique_ptr latest_reply = std::move(reply_queue_.front()); - reply_queue_.pop_front(); - - return latest_reply; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -void rmw_client_data_t::increment_in_flight_callbacks() -{ - std::lock_guard lock(in_flight_mutex_); - num_in_flight_++; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -bool rmw_client_data_t::shutdown_and_query_in_flight() -{ - std::lock_guard lock(in_flight_mutex_); - is_shutdown_ = true; - - return num_in_flight_ > 0; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure -// for the use of this method. -bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) -{ - std::lock_guard lock(in_flight_mutex_); - queries_in_flight = --num_in_flight_ > 0; - return is_shutdown_; -} - -bool rmw_client_data_t::is_shutdown() const -{ - std::lock_guard lock(in_flight_mutex_); - return is_shutdown_; -} - - -///============================================================================= -size_t rmw_client_data_t::get_next_sequence_number() -{ - std::lock_guard lock(sequence_number_mutex_); - return sequence_number_++; -} - -//============================================================================== -void client_data_handler(z_owned_reply_t * reply, void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (client_data->is_shutdown()) { - return; - } - - if (!z_reply_check(reply)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_check returned False" - ); - return; - } - if (!z_reply_is_ok(reply)) { - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(client_data->keyexpr)); - z_value_t err = z_reply_err(reply); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - z_loan(keystr), - (int)err.payload.len, - err.payload.start); - z_drop(z_move(keystr)); - - return; - } - - client_data->add_new_reply(std::make_unique(reply)); - // Since we took ownership of the reply, null it out here - *reply = z_reply_null(); -} - -void client_data_drop(void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - bool queries_in_flight = false; - bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); - - if (is_shutdown) { - if (!queries_in_flight) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - client_data->context->options.allocator.deallocate( - client_data, client_data->context->options.allocator.state); - } - } -} - -} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp deleted file mode 100644 index d0465a07..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__RMW_DATA_TYPES_HPP_ -#define DETAIL__RMW_DATA_TYPES_HPP_ - -#include - -#include -#include -#include -#include -#include -#include - -#include "rcutils/allocator.h" - -#include "rmw/rmw.h" - -#include "rosidl_runtime_c/type_hash.h" - -#include "event.hpp" -#include "message_type_support.hpp" -#include "rmw_wait_set_data.hpp" -#include "service_type_support.hpp" -#include "zenoh_utils.hpp" - -/// Structs for various type erased data fields. -namespace rmw_zenoh_cpp -{ -///============================================================================= -void client_data_handler(z_owned_reply_t * reply, void * client_data); -void client_data_drop(void * data); - -///============================================================================= -class rmw_client_data_t final -{ -public: - // The Entity generated for the client. - std::shared_ptr entity; - - z_owned_keyexpr_t keyexpr; - - // Store the actual QoS profile used to configure this client. - // The QoS is reused for sending requests and getting responses. - rmw_qos_profile_t adapted_qos_profile; - - // Liveliness token for the client. - zc_owned_liveliness_token_t token; - - const void * request_type_support_impl; - const void * response_type_support_impl; - const char * typesupport_identifier; - const rosidl_type_hash_t * type_hash; - RequestTypeSupport * request_type_support; - ResponseTypeSupport * response_type_support; - - rmw_context_t * context; - - uint8_t client_gid[RMW_GID_STORAGE_SIZE]; - - size_t get_next_sequence_number(); - - void add_new_reply(std::unique_ptr reply); - - bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - - bool detach_condition_and_queue_is_empty(); - - std::unique_ptr pop_next_reply(); - - DataCallbackManager data_callback_mgr; - - // See the comment for "num_in_flight" below on the use of this method. - void increment_in_flight_callbacks(); - - // See the comment for "num_in_flight" below on the use of this method. - bool shutdown_and_query_in_flight(); - - // See the comment for "num_in_flight" below on the use of this method. - bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); - - bool is_shutdown() const; - -private: - void notify(); - - size_t sequence_number_{1}; - std::mutex sequence_number_mutex_; - - rmw_wait_set_data_t * wait_set_data_{nullptr}; - std::mutex condition_mutex_; - - std::deque> reply_queue_; - mutable std::mutex reply_queue_mutex_; - - // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no - // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an - // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the - // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. - // - // The next 3 variables are used to avoid that situation. Any time a query is initiated via - // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the - // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ - // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. - // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. - // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query - // callback, the query callback will free up the structure. - // - // There is one case which is not handled by this, which has to do with timeouts. The query - // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never - // returns, the memory in this structure will never be freed. There isn't much we can do about - // that at this time, but we may want to consider changing the timeout so that the memory can - // eventually be freed up. - mutable std::mutex in_flight_mutex_; - bool is_shutdown_{false}; - size_t num_in_flight_{0}; -}; -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 19357c3d..1b853a41 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -319,6 +319,73 @@ void NodeData::delete_service_data(const rmw_service_t * const service) services_.erase(service); } + +///============================================================================= +bool NodeData::create_client_data( + const rmw_client_t * const client, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_supports, + const rmw_qos_profile_t * qos_profile) +{ + std::lock_guard lock_guard(mutex_); + if (is_shutdown_) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create ClientData as the NodeData has been shutdown."); + return false; + } + + if (clients_.count(client) > 0) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "ClientData already exists."); + return false; + } + + auto client_data = ClientData::make( + std::move(session), + node_, + entity_->node_info(), + id_, + std::move(id), + std::move(service_name), + type_supports, + qos_profile); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make ClientData."); + return false; + } + + auto insertion = clients_.insert(std::make_pair(client, std::move(client_data))); + if (!insertion.second) { + return false; + } + return true; +} + +///============================================================================= +ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + auto it = clients_.find(client); + if (it == clients_.end()) { + return nullptr; + } + + return it->second; +} + +///============================================================================= +void NodeData::delete_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + clients_.erase(client); +} + ///============================================================================= rmw_ret_t NodeData::shutdown() { @@ -365,6 +432,18 @@ rmw_ret_t NodeData::shutdown() ); } } + for (auto cli_it = clients_.begin(); cli_it != clients_.end(); ++cli_it) { + ret = cli_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown client %s within id %zu. rmw_ret_t code: %zu.", + cli_it->second->topic_info().name_.c_str(), + id_, + ret + ); + } + } // Unregister this node from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index c6c74e0d..f85b1366 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -25,6 +25,7 @@ #include "graph_cache.hpp" #include "liveliness_utils.hpp" +#include "rmw_client_data.hpp" #include "rmw_publisher_data.hpp" #include "rmw_subscription_data.hpp" #include "rmw_service_data.hpp" @@ -97,6 +98,21 @@ class NodeData final // Delete the ServiceData for a given rmw_service_t if present. void delete_service_data(const rmw_service_t * const service); + // Create a new ClientData for a given rmw_client_t. + bool create_client_data( + const rmw_client_t * const client, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Retrieve the ClientData for a given rmw_client_t if present. + ClientDataPtr get_client_data(const rmw_client_t * const client); + + // Delete the ClientData for a given rmw_client_t if present. + void delete_client_data(const rmw_client_t * const client); + // Shutdown this NodeData. rmw_ret_t shutdown(); @@ -132,6 +148,8 @@ class NodeData final std::unordered_map subs_; // Map of services. std::unordered_map services_; + // Map of clients. + std::unordered_map clients_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 3a209ce8..44ece7c2 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -21,7 +21,6 @@ #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/rmw_publisher_data.hpp" #include "detail/rmw_subscription_data.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 4c6b2f20..5beb1b4e 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -22,7 +22,6 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "rcutils/env.h" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4b2b4f0c..f28e5f8e 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -36,7 +36,6 @@ #include "detail/message_type_support.hpp" #include "detail/qos.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" #include "detail/zenoh_utils.hpp" @@ -1371,21 +1370,28 @@ rmw_create_client( const rmw_qos_profile_t * qos_profile) { RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return nullptr); - + RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); if (strlen(service_name) == 0) { RMW_SET_ERROR_MSG("service name is empty string"); return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); - RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); - + // Validate service name + int validation_result; + if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { + RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + return nullptr; + } + if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + return nullptr; + } RMW_CHECK_FOR_NULL_WITH_MSG( node->context, "expected initialized context", @@ -1405,23 +1411,22 @@ rmw_create_client( return nullptr; } - rcutils_allocator_t * allocator = &node->context->options.allocator; - - // Validate service name - int validation_result; - - if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { - RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + // Get the RMW the type support. + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_service_type_support return nullptr; } - if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + rcutils_allocator_t * allocator = &node->context->options.allocator; + if (!rcutils_allocator_is_valid(allocator)) { + RMW_SET_ERROR_MSG("allocator is invalid."); return nullptr; } - // client data - rmw_client_t * rmw_client = static_cast(allocator->zero_allocate( + // Create the rmw_client. + rmw_client_t * rmw_client = + static_cast(allocator->zero_allocate( 1, sizeof(rmw_client_t), allocator->state)); @@ -1429,223 +1434,46 @@ rmw_create_client( rmw_client, "failed to allocate memory for the client", return nullptr); - auto free_rmw_client = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(rmw_client, allocator->state); }); - auto client_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_client_data_t), allocator->state)); + auto node_data = context_impl->get_node_data(node); RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "failed to allocate memory for client data", + node_data, + "NodeData not found.", return nullptr); - auto free_client_data = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW(client_data, client_data, return nullptr, rmw_zenoh_cpp::rmw_client_data_t); - auto destruct_client_data = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->~rmw_client_data_t(), - rmw_zenoh_cpp::rmw_client_data_t); - }); - rmw_zenoh_cpp::generate_random_gid(client_data->client_gid); - - // Adapt any 'best available' QoS options - client_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( - nullptr, nullptr, &client_data->adapted_qos_profile, nullptr); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); - return nullptr; - } - - // Obtain the type support - const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); - if (type_support == nullptr) { - // error was already set by find_service_type_support + if (!node_data->create_client_data( + rmw_client, + context_impl->session(), + context_impl->get_next_entity_id(), + service_name, + type_support, + qos_profile)) + { + // Error already handled. return nullptr; } - auto service_members = static_cast(type_support->data); - auto request_members = static_cast( - service_members->request_members_->data); - auto response_members = static_cast( - service_members->response_members_->data); - - client_data->context = node->context; - client_data->typesupport_identifier = type_support->typesupport_identifier; - client_data->type_hash = type_support->get_type_hash_func(type_support); - client_data->request_type_support_impl = request_members; - client_data->response_type_support_impl = response_members; - - // Request type support - client_data->request_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::RequestTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->request_type_support, - "Failed to allocate rmw_zenoh_cpp::RequestTypeSupport", - return nullptr); - auto free_request_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->request_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->request_type_support, - client_data->request_type_support, - return nullptr, - rmw_zenoh_cpp::RequestTypeSupport, service_members); - auto destruct_request_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->request_type_support->~RequestTypeSupport(), - rmw_zenoh_cpp::RequestTypeSupport); - }); - - // Response type support - client_data->response_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::ResponseTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->response_type_support, - "Failed to allocate rmw_zenoh_cpp::ResponseTypeSupport", - return nullptr); - auto free_response_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->response_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->response_type_support, - client_data->response_type_support, - return nullptr, - rmw_zenoh_cpp::ResponseTypeSupport, service_members); - auto destruct_response_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->response_type_support->~ResponseTypeSupport(), - rmw_zenoh_cpp::ResponseTypeSupport); - }); - - // Populate the rmw_client. + // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased + // Client handle will be returned in the rmw_clients_t in rmw_wait + // from which we cannot obtain ClientData. + rmw_client->data = static_cast(node_data->get_client_data(rmw_client).get()); rmw_client->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_client->service_name = rcutils_strdup(service_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( rmw_client->service_name, - "failed to allocate client name", + "failed to allocate service name", return nullptr); auto free_service_name = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); }); - // Note: Service request/response types will contain a suffix Request_ or Response_. - // We remove the suffix when appending the type to the liveliness tokens for - // better reusability within GraphCache. - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), rmw_client->service_name); - return nullptr; - } - - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - client_data->type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - z_session_t session = context_impl->session(); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "NodeData not found.", - return nullptr); - client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id()), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Client, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - node->context->actual_domain_id, - rmw_client->service_name, - std::move(service_type), - type_hash_c_str, - client_data->adapted_qos_profile} - ); - if (client_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the client %s.", - rmw_client->service_name); - return nullptr; - } - - client_data->keyexpr = z_keyexpr_new(client_data->entity->topic_info()->topic_keyexpr_.c_str()); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { - z_keyexpr_drop(z_move(client_data->keyexpr)); - }); - if (!z_keyexpr_check(&client_data->keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - - client_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(client_data->entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [client_data]() { - if (client_data != nullptr) { - z_drop(z_move(client_data->token)); - } - }); - if (!z_check(client_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the client."); - return nullptr; - } - - rmw_client->data = client_data; - - free_token.cancel(); free_rmw_client.cancel(); - free_client_data.cancel(); - destruct_request_type_support.cancel(); - free_request_type_support.cancel(); - destruct_response_type_support.cancel(); - free_response_type_support.cancel(); - destruct_client_data.cancel(); free_service_name.cancel(); - free_ros_keyexpr.cancel(); return rmw_client; } @@ -1655,10 +1483,12 @@ rmw_create_client( rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1669,36 +1499,15 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rcutils_allocator_t * allocator = &node->context->options.allocator; - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "client implementation pointer is null.", - return RMW_RET_INVALID_ARGUMENT); - - // CLEANUP =================================================================== - z_drop(z_move(client_data->keyexpr)); - zc_liveliness_undeclare_token(z_move(client_data->token)); - - RMW_TRY_DESTRUCTOR( - client_data->request_type_support->~RequestTypeSupport(), rmw_zenoh_cpp::RequestTypeSupport, ); - allocator->deallocate(client_data->request_type_support, allocator->state); - - RMW_TRY_DESTRUCTOR( - client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, - ); - allocator->deallocate(client_data->response_type_support, allocator->state); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (!client_data->shutdown_and_query_in_flight()) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + auto node_data = context_impl->get_node_data(node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; } + // Remove the ClientData from NodeData. + node_data->delete_client_data(client); + + rcutils_allocator_t * allocator = &node->context->options.allocator; allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -1715,113 +1524,23 @@ rmw_send_request( { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client_data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); - if (client_data->is_shutdown()) { - return RMW_RET_ERROR; - } - - rmw_context_impl_s * context_impl = static_cast( - client_data->context->impl); - - // Serialize data - - rcutils_allocator_t * allocator = &(client_data->context->options.allocator); - - size_t max_data_length = ( - client_data->request_type_support->get_estimated_serialized_size( - ros_request, client_data->request_type_support_impl)); - - // Init serialized message byte array - char * request_bytes = static_cast(allocator->allocate( - max_data_length, - allocator->state)); - if (!request_bytes) { - RMW_SET_ERROR_MSG("failed allocate request message bytes"); - return RMW_RET_ERROR; - } - auto free_request_bytes = rcpputils::make_scope_exit( - [request_bytes, allocator]() { - allocator->deallocate(request_bytes, allocator->state); - }); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!client_data->request_type_support->serialize_ros_message( - ros_request, - ser.get_cdr(), - client_data->request_type_support_impl)) - { - return RMW_RET_ERROR; - } - - size_t data_length = ser.get_serialized_data_length(); - - *sequence_id = client_data->get_next_sequence_number(); - - // Send request - z_get_options_t opts = z_get_options_default(); - - z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( - *sequence_id, - [client_data](z_owned_bytes_map_t * map, const char * key) - { - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = client_data->client_gid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - client_data->increment_in_flight_callbacks(); - - opts.attachment = z_bytes_map_as_attachment(&map); - - opts.target = Z_QUERY_TARGET_ALL_COMPLETE; - // The default timeout for a z_get query is 10 seconds and if a response is not received within - // this window, the queryable will return an invalid reply. However, it is common for actions, - // which are implemented using services, to take an extended duration to complete. Hence, we set - // the timeout_ms to the largest supported value to account for most realistic scenarios. - opts.timeout_ms = std::numeric_limits::max(); - // Latest consolidation guarantees unicity of replies for the same key expression, - // which optimizes bandwidth. The default is "None", which imples replies may come in any order - // and any number. - opts.consolidation = z_query_consolidation_latest(); - opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; - z_owned_closure_reply_t zn_closure_reply = - z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); - z_get( - context_impl->session(), - z_loan(client_data->keyexpr), "", - z_move(zn_closure_reply), - &opts); - - return RMW_RET_OK; + return client_data->send_request(ros_request, sequence_id); } //============================================================================== @@ -1833,12 +1552,10 @@ rmw_take_response( void * ros_response, bool * taken) { + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); *taken = false; - RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, @@ -1846,71 +1563,13 @@ rmw_take_response( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_FOR_NULL_WITH_MSG( client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = client_data->pop_next_reply(); - if (latest_reply == nullptr) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - std::optional sample = latest_reply->get_sample(); - if (!sample) { - RMW_SET_ERROR_MSG("invalid reply sample"); - return RMW_RET_ERROR; - } - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(sample->payload.start)), - sample->payload.len); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!client_data->response_type_support->deserialize_ros_message( - deser.get_cdr(), - ros_response, - client_data->response_type_support_impl)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS response"); - return RMW_RET_ERROR; - } - - // Fill in the request_header - - request_header->request_id.sequence_number = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } - - request_header->source_timestamp = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); - return RMW_RET_ERROR; - } - - if (!rmw_zenoh_cpp::get_gid_from_attachment( - &sample->attachment, - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client gid from attachment"); - return RMW_RET_ERROR; - } - - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - request_header->received_timestamp = now_ns.count(); - - *taken = true; - - return RMW_RET_OK; + return client_data->take_response(request_header, ros_response, taken); } //============================================================================== @@ -1927,12 +1586,11 @@ rmw_client_request_publisher_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - *qos = client_data->adapted_qos_profile; + *qos = client_data->topic_info().qos_; return RMW_RET_OK; } @@ -1956,7 +1614,6 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profile) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, @@ -2009,7 +1666,6 @@ rmw_create_service( return nullptr; } - // SERVICE DATA ============================================================== rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { RMW_SET_ERROR_MSG("allocator is invalid."); @@ -2050,7 +1706,7 @@ rmw_create_service( } // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased - // Service handle will be returned in the rmw_service_t in rmw_wait + // Service handle will be returned in the rmw_services_t in rmw_wait // from which we cannot obtain ServiceData. rmw_service->data = static_cast(node_data->get_service_data(rmw_service).get()); rmw_service->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; @@ -2441,8 +2097,8 @@ check_and_attach_condition( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2602,8 +2258,8 @@ rmw_wait( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2815,12 +2471,12 @@ rmw_get_gid_for_client(const rmw_client_t * client, rmw_gid_t * gid) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(gid, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(gid->data, client_data->client_gid, RMW_GID_STORAGE_SIZE); + client_data->copy_gid(gid); return RMW_RET_OK; } @@ -2866,29 +2522,16 @@ rmw_service_server_is_available( RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(is_available, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); if (client_data == nullptr) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( "Unable to retreive client_data from client for service %s", client->service_name); return RMW_RET_INVALID_ARGUMENT; } - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), client->service_name); - return RMW_RET_INVALID_ARGUMENT; - } - return node->context->impl->graph_cache()->service_server_is_available( - client->service_name, service_type.c_str(), is_available); + client_data->topic_info(), is_available); } //============================================================================== @@ -2961,11 +2604,11 @@ rmw_client_set_on_new_response_callback( const void * user_data) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - client_data->data_callback_mgr.set_callback( - user_data, std::move(callback)); + client_data->set_on_new_response_callback( + std::move(callback), user_data); return RMW_RET_OK; } } // extern "C"