diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d6080970..7c4a27eb 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -38,8 +38,8 @@ add_library(rmw_zenoh_cpp SHARED src/detail/logging.cpp src/detail/message_type_support.cpp src/detail/qos.cpp + src/detail/rmw_client_data.cpp src/detail/rmw_context_impl_s.cpp - src/detail/rmw_data_types.cpp src/detail/rmw_publisher_data.cpp src/detail/rmw_node_data.cpp src/detail/rmw_service_data.cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 22f1b153..776d343d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -36,7 +36,6 @@ #include "graph_cache.hpp" #include "logging_macros.hpp" -#include "rmw_data_types.hpp" namespace rmw_zenoh_cpp { @@ -1182,15 +1181,15 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( ///============================================================================= rmw_ret_t GraphCache::service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const { *is_available = false; std::lock_guard lock(graph_mutex_); - GraphNode::TopicMap::const_iterator service_it = graph_services_.find(service_name); + GraphNode::TopicMap::const_iterator service_it = graph_services_.find(client_topic_info.name_); if (service_it != graph_services_.end()) { - GraphNode::TopicTypeMap::const_iterator type_it = service_it->second.find(service_type); + GraphNode::TopicTypeMap::const_iterator type_it = + service_it->second.find(client_topic_info.type_); if (type_it != service_it->second.end()) { for (const auto & [_, topic_data] : type_it->second) { if (topic_data->subs_.size() > 0) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index f2302efd..8ac2172d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -172,8 +172,7 @@ class GraphCache final rmw_topic_endpoint_info_array_t * endpoints_info) const; rmw_ret_t service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const; /// Set a qos event callback for an entity from the current session. diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp new file mode 100644 index 00000000..cf0fcb4e --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -0,0 +1,588 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_client_data.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "attachment_helpers.hpp" +#include "cdr.hpp" +#include "liveliness_utils.hpp" +#include "logging_macros.hpp" +#include "message_type_support.hpp" +#include "qos.hpp" +#include "rmw_context_impl_s.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" + +namespace +{ + +///============================================================================= +void client_data_handler(z_owned_reply_t * reply, void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t from data in client_data_handler." + ); + return; + } + + if (client_data->is_shutdown()) { + return; + } + + if (!z_reply_check(reply)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_check returned False" + ); + return; + } + if (!z_reply_is_ok(reply)) { + z_value_t err = z_reply_err(reply); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", + client_data->topic_info().topic_keyexpr_.c_str(), + (int)err.payload.len, + err.payload.start); + + return; + } + + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + client_data->add_new_reply( + std::make_unique(reply, received_timestamp)); + + // Since we took ownership of the reply, null it out here + *reply = z_reply_null(); +} + +///============================================================================= +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t from data in client_data_drop." + ); + return; + } + + client_data->decrement_in_flight_and_conditionally_remove(); +} + +} // namespace + +namespace rmw_zenoh_cpp +{ +///============================================================================= +std::shared_ptr ClientData::make( + z_session_t session, + const rmw_node_t * const node, + const rmw_client_t * client, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + // Adapt any 'best available' QoS options + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + nullptr, nullptr, &adapted_qos_profile, nullptr); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + auto request_type_support = std::make_shared(service_members); + auto response_type_support = std::make_shared(service_members); + + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = request_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Request_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), service_name.c_str()); + return nullptr; + } + + // Convert the type hash to a string so that it can be included in the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + std::size_t domain_id = node_info.domain_id_; + auto entity = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(service_id), + liveliness::EntityType::Client, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + service_name, + std::move(service_type), + type_hash_c_str, + std::move(adapted_qos_profile)} + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the client %s.", + service_name.c_str()); + return nullptr; + } + + std::shared_ptr client_data = std::shared_ptr( + new ClientData{ + node, + client, + entity, + request_members, + response_members, + request_type_support, + response_type_support + }); + + if (!client_data->init(session)) { + // init() already set the error. + return nullptr; + } + + return client_data; +} + +///============================================================================= +ClientData::ClientData( + const rmw_node_t * rmw_node, + const rmw_client_t * rmw_client, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::shared_ptr request_type_support, + std::shared_ptr response_type_support) +: rmw_node_(rmw_node), + rmw_client_(rmw_client), + entity_(std::move(entity)), + request_type_support_impl_(request_type_support_impl), + response_type_support_impl_(response_type_support_impl), + request_type_support_(request_type_support), + response_type_support_(response_type_support), + wait_set_data_(nullptr), + sequence_number_(1), + is_shutdown_(false), + num_in_flight_(0) +{ + // Do nothing. +} + +///============================================================================= +bool ClientData::init(z_session_t session) +{ + this->keyexpr_ = + z_keyexpr_new(this->entity_->topic_info().value().topic_keyexpr_.c_str()); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->keyexpr_)); + }); + if (!z_check(z_loan(this->keyexpr_))) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return false; + } + + this->token_ = zc_liveliness_declare_token( + session, + z_keyexpr(this->entity_->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->token_)); + }); + if (!z_check(this->token_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return false; + } + + free_ros_keyexpr.cancel(); + free_token.cancel(); + + return true; +} + +///============================================================================= +liveliness::TopicInfo ClientData::topic_info() const +{ + std::lock_guard lock(mutex_); + return entity_->topic_info().value(); +} + +///============================================================================= +bool ClientData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + return zc_liveliness_token_check(&token_); +} + +///============================================================================= +void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +{ + std::lock_guard lock(mutex_); + entity_->copy_gid(out_gid); +} + +///============================================================================= +void ClientData::add_new_reply(std::unique_ptr reply) +{ + std::lock_guard lock(mutex_); + const rmw_qos_profile_t adapted_qos_profile = + entity_->topic_info().value().qos_; + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + reply_queue_.size() >= adapted_qos_profile.depth) + { + // Log warning if message is discarded due to hitting the queue depth + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(keyexpr_)); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Query queue depth of %ld reached, discarding oldest Query " + "for client for %s", + adapted_qos_profile.depth, + z_loan(keystr)); + z_drop(z_move(keystr)); + reply_queue_.pop_front(); + } + reply_queue_.emplace_back(std::move(reply)); + + // Since we added new data, trigger user callback and guard condition if they are available + data_callback_mgr_.trigger_callback(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); + } +} + +///============================================================================= +rmw_ret_t ClientData::take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken) +{ + std::lock_guard lock(mutex_); + *taken = false; + + if (is_shutdown_ || reply_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + std::unique_ptr latest_reply = std::move(reply_queue_.front()); + reply_queue_.pop_front(); + + std::optional sample = latest_reply->get_sample(); + if (!sample) { + RMW_SET_ERROR_MSG("invalid reply sample"); + return RMW_RET_ERROR; + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(sample->payload.start)), + sample->payload.len); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!response_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_response, + response_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } + + // Fill in the request_header + request_header->request_id.sequence_number = + rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); + return RMW_RET_ERROR; + } + request_header->source_timestamp = + rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + if (!rmw_zenoh_cpp::get_gid_from_attachment( + &sample->attachment, + request_header->request_id.writer_guid)) + { + RMW_SET_ERROR_MSG("Could not get client gid from attachment"); + return RMW_RET_ERROR; + } + + request_header->received_timestamp = latest_reply->get_received_timestamp(); + + *taken = true; + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t ClientData::send_request( + const void * ros_request, + int64_t * sequence_id) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + rmw_context_impl_s * context_impl = static_cast(rmw_node_->context->impl); + if (context_impl == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } + + size_t max_data_length = ( + request_type_support_->get_estimated_serialized_size( + ros_request, request_type_support_impl_)); + + // Init serialized message byte array + char * request_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!request_bytes) { + RMW_SET_ERROR_MSG("failed allocate request message bytes"); + return RMW_RET_ERROR; + } + auto always_free_request_bytes = rcpputils::make_scope_exit( + [request_bytes, allocator]() { + allocator->deallocate(request_bytes, allocator->state); + }); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); + // Object that serializes the data + Cdr ser(fastbuffer); + if (!request_type_support_->serialize_ros_message( + ros_request, + ser.get_cdr(), + request_type_support_impl_)) + { + return RMW_RET_ERROR; + } + size_t data_length = ser.get_serialized_data_length(); + *sequence_id = sequence_number_++; + + // Send request + z_get_options_t opts = z_get_options_default(); + z_owned_bytes_map_t map = create_map_and_set_sequence_num( + *sequence_id, + [this](z_owned_bytes_map_t * map, const char * key) + { + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + z_bytes_t gid_bytes; + gid_bytes.len = RMW_GID_STORAGE_SIZE; + gid_bytes.start = local_gid; + z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); + }); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto always_free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + opts.attachment = z_bytes_map_as_attachment(&map); + opts.target = Z_QUERY_TARGET_ALL_COMPLETE; + // The default timeout for a z_get query is 10 seconds and if a response is not received within + // this window, the queryable will return an invalid reply. However, it is common for actions, + // which are implemented using services, to take an extended duration to complete. Hence, we set + // the timeout_ms to the largest supported value to account for most realistic scenarios. + opts.timeout_ms = std::numeric_limits::max(); + // Latest consolidation guarantees unicity of replies for the same key expression, + // which optimizes bandwidth. The default is "None", which imples replies may come in any order + // and any number. + opts.consolidation = z_query_consolidation_latest(); + opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; + // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, + // capture shared_from_this() instead of this. + z_owned_closure_reply_t zn_closure_reply = + z_closure(client_data_handler, client_data_drop, this); + z_get( + context_impl->session(), + z_loan(keyexpr_), "", + z_move(zn_closure_reply), + &opts); + + return RMW_RET_OK; +} + +///============================================================================= +ClientData::~ClientData() +{ + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing client /%s.", + entity_->topic_info().value().name_.c_str() + ); + } +} + +//============================================================================== +void ClientData::set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data) +{ + std::lock_guard lock(mutex_); + data_callback_mgr_.set_callback(user_data, std::move(callback)); +} + +///============================================================================= +bool ClientData::queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data) +{ + std::lock_guard lock(mutex_); + if (!reply_queue_.empty()) { + return true; + } + wait_set_data_ = wait_set_data; + + return false; +} + +///============================================================================= +bool ClientData::detach_condition_and_queue_is_empty() +{ + std::lock_guard lock(mutex_); + wait_set_data_ = nullptr; + + return reply_queue_.empty(); +} + +///============================================================================= +void ClientData::_shutdown() +{ + if (is_shutdown_) { + return; + } + + // Unregister this node from the ROS graph. + if (zc_liveliness_token_check(&token_)) { + zc_liveliness_undeclare_token(z_move(token_)); + } + if (z_check(z_loan(keyexpr_))) { + z_drop(z_move(keyexpr_)); + } + + is_shutdown_ = true; +} + +///============================================================================= +rmw_ret_t ClientData::shutdown() +{ + std::lock_guard lock(mutex_); + _shutdown(); + return RMW_RET_OK; +} + +///============================================================================= +bool ClientData::shutdown_and_query_in_flight() +{ + std::lock_guard lock(mutex_); + _shutdown(); + return num_in_flight_ > 0; +} + +///============================================================================= +void ClientData::decrement_in_flight_and_conditionally_remove() +{ + std::lock_guard lock(mutex_); + --num_in_flight_; + + if (is_shutdown_ && num_in_flight_ == 0) { + rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); + if (context_impl == nullptr) { + return; + } + std::shared_ptr node_data = context_impl->get_node_data(rmw_node_); + if (node_data == nullptr) { + return; + } + node_data->delete_client_data(rmw_client_); + } +} + +///============================================================================= +bool ClientData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp new file mode 100644 index 00000000..eda4bf9c --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -0,0 +1,159 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_CLIENT_DATA_HPP_ +#define DETAIL__RMW_CLIENT_DATA_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.hpp" +#include "liveliness_utils.hpp" +#include "message_type_support.hpp" +#include "service_type_support.hpp" +#include "type_support_common.hpp" +#include "zenoh_utils.hpp" + +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" +#include "rmw/ret_types.h" + +namespace rmw_zenoh_cpp +{ + +///============================================================================= +class ClientData final : public std::enable_shared_from_this +{ +public: + // Make a shared_ptr of ClientData. + static std::shared_ptr make( + z_session_t session, + const rmw_node_t * const node, + const rmw_client_t * client, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Get a copy of the TopicInfo of this ClientData. + liveliness::TopicInfo topic_info() const; + + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + + // Copy the GID of this ClientData into an rmw_gid_t. + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + + // Add a new ZenohReply to the queue. + void add_new_reply(std::unique_ptr reply); + + // Take a ROS service response. + rmw_ret_t take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken); + + // Send a service request. + rmw_ret_t send_request( + const void * ros_request, + int64_t * sequence_id); + + // Set a callback to be called when events happen. + void set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data); + + // Check if there is data in the queue, and if not attach the wait set condition variable. + bool queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data); + + // Detach any attached wait set condition variable, and return whether there is data in the queue. + bool detach_condition_and_queue_is_empty(); + + // Shutdown this ClientData. + rmw_ret_t shutdown(); + + // Shutdown this ClientData, and return whether there are any requests currently in flight. + bool shutdown_and_query_in_flight(); + + // Decrement the in flight requests, and if that drops to 0 remove the client from the node. + void decrement_in_flight_and_conditionally_remove(); + + // Check if this ClientData is shutdown. + bool is_shutdown() const; + + // Destructor. + ~ClientData(); + +private: + // Constructor. + ClientData( + const rmw_node_t * rmw_node, + const rmw_client_t * client, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::shared_ptr request_type_support, + std::shared_ptr response_type_support); + + // Initialize the Zenoh objects for this entity. + bool init(z_session_t session); + + // Shutdown this client (the mutex is expected to be held by the caller). + void _shutdown(); + + // Internal mutex. + mutable std::recursive_mutex mutex_; + // The parent node. + const rmw_node_t * rmw_node_; + const rmw_client_t * rmw_client_; + // The Entity generated for the service. + std::shared_ptr entity_; + // An owned keyexpression. + z_owned_keyexpr_t keyexpr_; + // Liveliness token for the service. + zc_owned_liveliness_token_t token_; + // Type support fields. + const void * request_type_support_impl_; + const void * response_type_support_impl_; + std::shared_ptr request_type_support_; + std::shared_ptr response_type_support_; + // Deque to store the replies in the order they arrive. + std::deque> reply_queue_; + // Wait set data. + rmw_wait_set_data_t * wait_set_data_; + // Data callback manager. + DataCallbackManager data_callback_mgr_; + // Sequence number for queries. + size_t sequence_number_; + // Shutdown flag. + bool is_shutdown_; + size_t num_in_flight_; +}; +using ClientDataPtr = std::shared_ptr; +using ClientDataConstPtr = std::shared_ptr; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_CLIENT_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index ed7fa87b..374af78e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -14,391 +14,531 @@ #include "rmw_context_impl_s.hpp" +#include +#include +#include +#include #include #include +#include #include #include +#include #include +#include "graph_cache.hpp" #include "guard_condition.hpp" #include "identifier.hpp" -#include "liveliness_utils.hpp" #include "logging_macros.hpp" +#include "rmw_node_data.hpp" #include "zenoh_config.hpp" #include "zenoh_router_check.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" -#include "rmw/impl/cpp/macros.hpp" // Megabytes of SHM to reserve. // TODO(clalancette): Make this configurable, or get it from the configuration #define SHM_BUFFER_SIZE_MB 10 -///============================================================================= -void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void * data) +// This global mapping of raw Data pointers to Data shared pointers allows graph_sub_data_handler() +// to lookup the pointer, and gain a reference to a shared_ptr if it exists. +// This guarantees that the Data object will not be destroyed while we are using it. +static std::mutex data_to_data_shared_ptr_map_mutex; +static std::unordered_map> data_to_data_shared_ptr_map; + +static void graph_sub_data_handler(const z_sample_t * sample, void * data); + +// Bundle all class members into a data struct which can be passed as a +// weak ptr to various threads for thread-safe access without capturing +// "this" ptr by reference. +class rmw_context_impl_s::Data final { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto free_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); +public: + // Constructor. + Data( + std::size_t domain_id, + const std::string & enclave) + : domain_id_(std::move(domain_id)), + enclave_(std::move(enclave)), + is_shutdown_(false), + next_entity_id_(0), + nodes_({}) + { + // Initialize the zenoh configuration. + z_owned_config_t config; + rmw_ret_t ret; + if ((ret = + rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Session, + &config)) != RMW_RET_OK) + { + throw std::runtime_error("Error configuring Zenoh session."); + } - auto data_ptr = static_cast(data); - if (data_ptr == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Invalid data_ptr." - ); - return; - } + // Check if shm is enabled. + z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); + auto always_free_shm_enabled = rcpputils::make_scope_exit( + [&shm_enabled]() { + z_drop(z_move(shm_enabled)); + }); + + // Initialize the zenoh session. + session_ = z_open(z_move(config)); + if (!z_session_check(&session_)) { + throw std::runtime_error("Error setting up zenoh session."); + } + auto close_session = rcpputils::make_scope_exit( + [this]() { + z_close(z_move(session_)); + }); + + // TODO(Yadunund) Move this check into a separate thread. + // Verify if the zenoh router is running if configured. + const std::optional configured_connection_attempts = + rmw_zenoh_cpp::zenoh_router_check_attempts(); + if (configured_connection_attempts.has_value()) { + ret = RMW_RET_ERROR; + uint64_t connection_attempts = 0; + // Retry until the connection is successful. + while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { + if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) { + ++connection_attempts; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + if (ret != RMW_RET_OK) { + throw std::runtime_error( + "Unable to connect to a Zenoh router after " + + std::to_string(configured_connection_attempts.value()) + + " retries."); + } + } - // Update the graph cache. - std::lock_guard lock(data_ptr->mutex_); - if (data_ptr->is_shutdown_) { - return; - } - switch (sample->kind) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - data_ptr->graph_cache_->parse_put(keystr._cstr); - break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - data_ptr->graph_cache_->parse_del(keystr._cstr); - break; - default: - return; - } + // Initialize the graph cache. + const z_id_t zid = z_info_zid(z_loan(session_)); + graph_cache_ = std::make_shared(zid); + // Setup liveliness subscriptions for discovery. + std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); + + // Query router/liveliness participants to get graph information before the session was started. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); + zc_liveliness_get( + z_loan(session_), z_keyexpr(liveliness_str.c_str()), + z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + graph_cache_->parse_put(z_loan(keystr), true); + z_drop(z_move(keystr)); + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); - // Trigger the ROS graph guard condition. - rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_.get()); - if (RMW_RET_OK != rmw_ret) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition." - ); + // Initialize the shm manager if shared_memory is enabled in the config. + shm_manager_ = std::nullopt; + if (shm_enabled._cstr != nullptr && + strcmp(shm_enabled._cstr, "true") == 0) + { + char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 + static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 + for (size_t i = 0; i < sizeof(zid.id); ++i) { + snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); + } + idstr[sizeof(zid.id) * 2] = '\0'; + // TODO(yadunund): Can we get the size of the shm from the config even though it's not + // a standard parameter? + shm_manager_ = + zc_shm_manager_new( + z_loan(session_), + idstr, + SHM_BUFFER_SIZE_MB * 1024 * 1024); + if (!shm_manager_.has_value() || + !zc_shm_manager_check(&shm_manager_.value())) + { + throw std::runtime_error("Unable to create shm manager."); + } + } + auto free_shm_manager = rcpputils::make_scope_exit( + [this]() { + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + }); + + graph_guard_condition_ = std::make_unique(); + graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + graph_guard_condition_->data = &guard_condition_data_; + + // Setup the liveliness subscriber to receives updates from the ROS graph + // and update the graph cache. + auto sub_options = zc_liveliness_subscriber_options_null(); + z_owned_closure_sample_t callback = z_closure( + graph_sub_data_handler, nullptr, this); + graph_subscriber_ = zc_liveliness_declare_subscriber( + z_loan(session_), + z_keyexpr(liveliness_str.c_str()), + z_move(callback), + &sub_options); + zc_liveliness_subscriber_options_drop(z_move(sub_options)); + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_undeclare_subscriber(z_move(this->graph_subscriber_)); + }); + if (!z_check(graph_subscriber_)) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + throw std::runtime_error("Unable to subscribe to ROS graph updates."); + } + + close_session.cancel(); + free_shm_manager.cancel(); + undeclare_z_sub.cancel(); } -} -///============================================================================= -rmw_context_impl_s::Data::Data( - std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - std::optional shm_manager, - const std::string & liveliness_str, - std::shared_ptr graph_cache) -: enclave_(std::move(enclave)), - domain_id_(std::move(domain_id)), - session_(std::move(session)), - shm_manager_(std::move(shm_manager)), - liveliness_str_(std::move(liveliness_str)), - graph_cache_(std::move(graph_cache)), - is_shutdown_(false), - next_entity_id_(0), - is_initialized_(false), - nodes_({}) -{ - graph_guard_condition_ = std::make_unique(); - graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - graph_guard_condition_->data = &guard_condition_data_; -} + // Shutdown the Zenoh session. + rmw_ret_t shutdown() + { + { + std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; + if (is_shutdown_) { + return ret; + } -///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() -{ - std::lock_guard lock(mutex_); - if (is_initialized_) { + // Shutdown all the nodes in this context. + for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { + ret = node_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", + node_it->second->id(), + ret + ); + } + } + + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + is_shutdown_ = true; + + // We specifically do *not* hold the mutex_ while tearing down the session; this allows us + // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). + } + + // Close the zenoh session + if (z_close(z_move(session_)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } - // Setup the liveliness subscriber to receives updates from the ROS graph - // and update the graph cache. - // TODO(Yadunund): This closure is still not 100% thread safe as we are - // passing Data* as the type erased argument to z_closure. Thus during - // the execution of graph_sub_data_handler, the rawptr may be freed/reset - // by a different thread. When we switch to zenoh-cpp we can replace z_closure - // with a lambda that captures a weak_ptr by copy. The lambda and caputed - // weak_ptr will have the same lifetime as the subscriber. Then within - // graph_sub_data_handler, we would first lock to weak_ptr to check if the - // shared_ptr exits. If it does, then even if a different thread calls - // rmw_context_fini() to destroy rmw_context_impl_s, the locked - // shared_ptr would live on until the graph_sub_data_handler callback. - auto sub_options = zc_liveliness_subscriber_options_null(); - z_owned_closure_sample_t callback = z_closure( - rmw_context_impl_s::graph_sub_data_handler, nullptr, this); - graph_subscriber_ = zc_liveliness_declare_subscriber( - z_loan(session_), - z_keyexpr(liveliness_str_.c_str()), - z_move(callback), - &sub_options); - zc_liveliness_subscriber_options_drop(z_move(sub_options)); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); - if (!z_check(graph_subscriber_)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return RMW_RET_ERROR; + + std::string enclave() const + { + std::lock_guard lock(mutex_); + return enclave_; } - undeclare_z_sub.cancel(); - is_initialized_ = true; - return RMW_RET_OK; -} + z_session_t session() const + { + std::lock_guard lock(mutex_); + return z_loan(session_); + } -///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::shutdown() -{ - std::lock_guard lock(mutex_); - rmw_ret_t ret = RMW_RET_OK; - if (is_shutdown_) { - return ret; + std::optional & shm_manager() + { + std::lock_guard lock(mutex_); + return shm_manager_; } - // Shutdown all the nodes in this context. - for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { - ret = node_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", - node_it->second->id(), - ret - ); - } + rmw_guard_condition_t * graph_guard_condition() + { + std::lock_guard lock(mutex_); + return graph_guard_condition_.get(); } - z_undeclare_subscriber(z_move(graph_subscriber_)); - if (shm_manager_.has_value()) { - z_drop(z_move(shm_manager_.value())); + std::size_t get_next_entity_id() + { + std::lock_guard lock(mutex_); + return next_entity_id_++; } - // Close the zenoh session - if (z_close(z_move(session_)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; + + bool is_shutdown() const + { + std::lock_guard lock(mutex_); + return is_shutdown_; } - is_shutdown_ = true; - return RMW_RET_OK; -} -///============================================================================= -rmw_context_impl_s::Data::~Data() -{ - auto ret = this->shutdown(); - nodes_.clear(); - static_cast(ret); -} + bool session_is_valid() const + { + std::lock_guard lock(mutex_); + return z_check(session_); + } -///============================================================================= -rmw_context_impl_s::rmw_context_impl_s( - const std::size_t domain_id, - const std::string & enclave) -{ - // Initialize the zenoh configuration. - z_owned_config_t config; - rmw_ret_t ret; - if ((ret = - rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Session, - &config)) != RMW_RET_OK) + std::shared_ptr graph_cache() { - throw std::runtime_error("Error configuring Zenoh session."); + std::lock_guard lock(mutex_); + return graph_cache_; } - // Check if shm is enabled. - z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); - auto always_free_shm_enabled = rcpputils::make_scope_exit( - [&shm_enabled]() { - z_drop(z_move(shm_enabled)); - }); + bool create_node_data( + const rmw_node_t * const node, + const std::string & ns, + const std::string & node_name) + { + std::lock_guard lock(mutex_); + if (nodes_.count(node) > 0) { + // Node already exists. + return false; + } - // Initialize the zenoh session. - z_owned_session_t session = z_open(z_move(config)); - if (!z_session_check(&session)) { - throw std::runtime_error("Error setting up zenoh session."); - } - auto close_session = rcpputils::make_scope_exit( - [&session]() { - z_close(z_move(session)); - }); + // Check that the Zenoh session is still valid. + if (!z_check(session_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create NodeData as Zenoh session is invalid."); + return false; + } - // TODO(Yadunund) Move this check into a separate thread. - // Verify if the zenoh router is running if configured. - const std::optional configured_connection_attempts = - rmw_zenoh_cpp::zenoh_router_check_attempts(); - if (configured_connection_attempts.has_value()) { - ret = RMW_RET_ERROR; - uint64_t connection_attempts = 0; - // Retry until the connection is successful. - while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { - ++connection_attempts; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto node_data = rmw_zenoh_cpp::NodeData::make( + node, + this->get_next_entity_id(), + z_loan(session_), + domain_id_, + ns, + node_name, + enclave_); + if (node_data == nullptr) { + // Error already handled. + return false; } - if (ret != RMW_RET_OK) { - throw std::runtime_error( - "Unable to connect to a Zenoh router after " + - std::to_string(configured_connection_attempts.value()) + - " retries."); + + auto node_insertion = nodes_.insert(std::make_pair(node, std::move(node_data))); + if (!node_insertion.second) { + return false; } + + return true; } - // Initialize the graph cache. - const z_id_t zid = z_info_zid(z_loan(session)); - auto graph_cache = std::make_shared(zid); - // Setup liveliness subscriptions for discovery. - std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( - domain_id); - - // Query router/liveliness participants to get graph information before this session was started. - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); - zc_liveliness_get( - z_loan(session), z_keyexpr(liveliness_str.c_str()), - z_move(channel.send), NULL); - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) + std::shared_ptr get_node_data(const rmw_node_t * const node) { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - graph_cache->parse_put(z_loan(keystr), true); - z_drop(z_move(keystr)); - } else { - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); + std::lock_guard lock(mutex_); + auto node_it = nodes_.find(node); + if (node_it == nodes_.end()) { + return nullptr; } + return node_it->second; } - z_drop(z_move(reply)); - z_drop(z_move(channel)); - // Initialize the shm manager if shared_memory is enabled in the config. - std::optional shm_manager = std::nullopt; - if (shm_enabled._cstr != nullptr && - strcmp(shm_enabled._cstr, "true") == 0) + void delete_node_data(const rmw_node_t * const node) { - char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 - static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 - for (size_t i = 0; i < sizeof(zid.id); ++i) { - snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); + std::lock_guard lock(mutex_); + nodes_.erase(node); + } + + void update_graph_cache(z_sample_kind_t sample_kind, const std::string & keystr) + { + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return; } - idstr[sizeof(zid.id) * 2] = '\0'; - // TODO(yadunund): Can we get the size of the shm from the config even though it's not - // a standard parameter? - shm_manager = - zc_shm_manager_new( - z_loan(session), - idstr, - SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!shm_manager.has_value() || - !zc_shm_manager_check(&shm_manager.value())) - { - throw std::runtime_error("Unable to create shm manager."); + switch (sample_kind) { + case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + graph_cache_->parse_put(keystr); + break; + case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + graph_cache_->parse_del(keystr); + break; + default: + return; + } + + // Trigger the ROS graph guard condition. + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(graph_guard_condition_.get()); + if (RMW_RET_OK != rmw_ret) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to trigger graph guard condition." + ); } } - auto free_shm_manager = rcpputils::make_scope_exit( - [&shm_manager]() { - if (shm_manager.has_value()) { - z_drop(z_move(shm_manager.value())); - } - }); - close_session.cancel(); - free_shm_manager.cancel(); + // Destructor. + ~Data() + { + auto ret = this->shutdown(); + nodes_.clear(); + static_cast(ret); + } + +private: + // Mutex to lock when accessing members. + mutable std::recursive_mutex mutex_; + // The ROS domain id of this context. + std::size_t domain_id_; + // Enclave, name used to find security artifacts in a sros2 keystore. + std::string enclave_; + // An owned session. + z_owned_session_t session_; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_manager_; + // Graph cache. + std::shared_ptr graph_cache_; + // ROS graph liveliness subscriber. + z_owned_subscriber_t graph_subscriber_; + // Equivalent to rmw_dds_common::Context's guard condition. + // Guard condition that should be triggered when the graph changes. + std::unique_ptr graph_guard_condition_; + // The GuardCondition data structure. + rmw_zenoh_cpp::GuardCondition guard_condition_data_; + // Shutdown flag. + bool is_shutdown_; + // A counter to assign a local id for every entity created in this session. + std::size_t next_entity_id_; + // Nodes created from this context. + std::unordered_map> nodes_; +}; + +///============================================================================= +static void graph_sub_data_handler(const z_sample_t * sample, void * data) +{ + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto free_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); - data_ = std::make_shared( - domain_id, - std::move(enclave), - std::move(session), - std::move(shm_manager), - std::move(liveliness_str), - std::move(graph_cache)); + auto data_ptr = static_cast(data); + if (data_ptr == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Invalid data_ptr." + ); + return; + } - ret = data_->subscribe_to_ros_graph(); - if (ret != RMW_RET_OK) { - throw std::runtime_error("Unable to subscribe to ROS Graph updates."); + // Look up the data shared_ptr in the global map. If it is in there, use it. + // If not, it is being shutdown so we can just ignore this update. + std::shared_ptr data_shared_ptr{nullptr}; + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + if (data_to_data_shared_ptr_map.count(data_ptr) == 0) { + return; + } + data_shared_ptr = data_to_data_shared_ptr_map[data_ptr]; } + + // Update the graph cache. + data_shared_ptr->update_graph_cache(sample->kind, keystr._cstr); +} + +///============================================================================= +rmw_context_impl_s::rmw_context_impl_s( + const std::size_t domain_id, + const std::string & enclave) +{ + data_ = std::make_shared(domain_id, std::move(enclave)); + + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + data_to_data_shared_ptr_map.emplace(data_.get(), data_); +} + +///============================================================================= +rmw_context_impl_s::~rmw_context_impl_s() +{ + this->shutdown(); } ///============================================================================= std::string rmw_context_impl_s::enclave() const { - std::lock_guard lock(data_->mutex_); - return data_->enclave_; + return data_->enclave(); } ///============================================================================= z_session_t rmw_context_impl_s::session() const { - std::lock_guard lock(data_->mutex_); - return z_loan(data_->session_); + return data_->session(); } ///============================================================================= std::optional & rmw_context_impl_s::shm_manager() { - std::lock_guard lock(data_->mutex_); - return data_->shm_manager_; + return data_->shm_manager(); } ///============================================================================= rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() { - std::lock_guard lock(data_->mutex_); - return data_->graph_guard_condition_.get(); + return data_->graph_guard_condition(); } ///============================================================================= std::size_t rmw_context_impl_s::get_next_entity_id() { - std::lock_guard lock(data_->mutex_); - return data_->next_entity_id_++; + return data_->get_next_entity_id(); } ///============================================================================= rmw_ret_t rmw_context_impl_s::shutdown() { + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + data_to_data_shared_ptr_map.erase(data_.get()); + } + return data_->shutdown(); } ///============================================================================= bool rmw_context_impl_s::is_shutdown() const { - std::lock_guard lock(data_->mutex_); - return data_->is_shutdown_; + return data_->is_shutdown(); } ///============================================================================= bool rmw_context_impl_s::session_is_valid() const { - std::lock_guard lock(data_->mutex_); - return z_check(data_->session_); + return data_->session_is_valid(); } ///============================================================================= std::shared_ptr rmw_context_impl_s::graph_cache() { - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_; + return data_->graph_cache(); } ///============================================================================= @@ -407,56 +547,18 @@ bool rmw_context_impl_s::create_node_data( const std::string & ns, const std::string & node_name) { - std::lock_guard lock(data_->mutex_); - if (data_->nodes_.count(node) > 0) { - // Node already exists. - return false; - } - - // Check that the Zenoh session is still valid. - if (!z_check(data_->session_)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create NodeData as Zenoh session is invalid."); - return false; - } - - auto node_data = rmw_zenoh_cpp::NodeData::make( - node, - this->get_next_entity_id(), - z_loan(data_->session_), - data_->domain_id_, - ns, - node_name, - data_->enclave_); - if (node_data == nullptr) { - // Error already handled. - return false; - } - - auto node_insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data))); - if (!node_insertion.second) { - return false; - } - - return true; + return data_->create_node_data(node, ns, node_name); } ///============================================================================= std::shared_ptr rmw_context_impl_s::get_node_data( const rmw_node_t * const node) { - std::lock_guard lock(data_->mutex_); - auto node_it = data_->nodes_.find(node); - if (node_it == data_->nodes_.end()) { - return nullptr; - } - return node_it->second; + return data_->get_node_data(node); } ///============================================================================= void rmw_context_impl_s::delete_node_data(const rmw_node_t * const node) { - std::lock_guard lock(data_->mutex_); - data_->nodes_.erase(node); + data_->delete_node_data(node); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 884056e3..b9367e6f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -17,20 +17,16 @@ #include -# include +#include #include -#include #include #include -#include #include "graph_cache.hpp" -#include "guard_condition.hpp" -#include "liveliness_utils.hpp" #include "rmw_node_data.hpp" -#include "rcutils/types.h" -#include "rmw/rmw.h" +#include "rmw/ret_types.h" +#include "rmw/types.h" ///============================================================================= class rmw_context_impl_s final @@ -46,12 +42,12 @@ class rmw_context_impl_s final const std::size_t domain_id, const std::string & enclave); + ~rmw_context_impl_s(); + // Get a copy of the enclave. std::string enclave() const; // Loan the Zenoh session. - // TODO(Yadunund): Remove this API once rmw_context_impl_s is updated to - // create other Zenoh objects. z_session_t session() const; // Get a reference to the shm_manager. @@ -93,67 +89,11 @@ class rmw_context_impl_s final /// Delete the NodeData for a given rmw_node_t if present. void delete_node_data(const rmw_node_t * const node); -private: - // Bundle all class members into a data struct which can be passed as a - // weak ptr to various threads for thread-safe access without capturing - // "this" ptr by reference. - struct Data : public std::enable_shared_from_this - { - // Constructor. - Data( - std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - std::optional shm_manager, - const std::string & liveliness_str, - std::shared_ptr graph_cache); - - // Subscribe to the ROS graph. - rmw_ret_t subscribe_to_ros_graph(); - - // Shutdown the Zenoh session. - rmw_ret_t shutdown(); - - // Destructor. - ~Data(); - - // Mutex to lock when accessing members. - mutable std::recursive_mutex mutex_; - // RMW allocator. - const rcutils_allocator_t * allocator_; - // Enclave, name used to find security artifacts in a sros2 keystore. - std::string enclave_; - // The ROS domain id of this context. - std::size_t domain_id_; - // An owned session. - z_owned_session_t session_; - // An optional SHM manager that is initialized of SHM is enabled in the - // zenoh session config. - std::optional shm_manager_; - // Liveliness keyexpr string to subscribe to for ROS graph changes. - std::string liveliness_str_; - // Graph cache. - std::shared_ptr graph_cache_; - // ROS graph liveliness subscriber. - z_owned_subscriber_t graph_subscriber_; - // Equivalent to rmw_dds_common::Context's guard condition. - // Guard condition that should be triggered when the graph changes. - std::unique_ptr graph_guard_condition_; - // The GuardCondition data structure. - rmw_zenoh_cpp::GuardCondition guard_condition_data_; - // Shutdown flag. - bool is_shutdown_; - // A counter to assign a local id for every entity created in this session. - std::size_t next_entity_id_; - // True once graph subscriber is initialized. - bool is_initialized_; - // Nodes created from this context. - std::unordered_map> nodes_; - }; + // Forward declaration + class Data; +private: std::shared_ptr data_{nullptr}; - - static void graph_sub_data_handler(const z_sample_t * sample, void * data); }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp deleted file mode 100644 index acf0bc70..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include - -#include "logging_macros.hpp" - -#include "rmw/error_handling.h" -#include "rmw/impl/cpp/macros.hpp" - -#include "attachment_helpers.hpp" -#include "rmw_data_types.hpp" - -///============================================================================= -namespace rmw_zenoh_cpp -{ -///============================================================================= -void rmw_client_data_t::notify() -{ - std::lock_guard lock(condition_mutex_); - if (wait_set_data_ != nullptr) { - std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); - wait_set_data_->triggered = true; - wait_set_data_->condition_variable.notify_one(); - } -} - -///============================================================================= -void rmw_client_data_t::add_new_reply(std::unique_ptr reply) -{ - std::lock_guard lock(reply_queue_mutex_); - if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && - reply_queue_.size() >= adapted_qos_profile.depth) - { - // Log warning if message is discarded due to hitting the queue depth - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Reply queue depth of %ld reached, discarding oldest reply " - "for client for %s", - adapted_qos_profile.depth, - z_loan(keystr)); - z_drop(z_move(keystr)); - reply_queue_.pop_front(); - } - reply_queue_.emplace_back(std::move(reply)); - - // Since we added new data, trigger user callback and guard condition if they are available - data_callback_mgr.trigger_callback(); - notify(); -} - -///============================================================================= -bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) -{ - std::lock_guard lock(condition_mutex_); - if (!reply_queue_.empty()) { - return true; - } - wait_set_data_ = wait_set_data; - - return false; -} - -///============================================================================= -bool rmw_client_data_t::detach_condition_and_queue_is_empty() -{ - std::lock_guard lock(condition_mutex_); - wait_set_data_ = nullptr; - - return reply_queue_.empty(); -} - -///============================================================================= -std::unique_ptr rmw_client_data_t::pop_next_reply() -{ - std::lock_guard lock(reply_queue_mutex_); - - if (reply_queue_.empty()) { - return nullptr; - } - - std::unique_ptr latest_reply = std::move(reply_queue_.front()); - reply_queue_.pop_front(); - - return latest_reply; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -void rmw_client_data_t::increment_in_flight_callbacks() -{ - std::lock_guard lock(in_flight_mutex_); - num_in_flight_++; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -bool rmw_client_data_t::shutdown_and_query_in_flight() -{ - std::lock_guard lock(in_flight_mutex_); - is_shutdown_ = true; - - return num_in_flight_ > 0; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure -// for the use of this method. -bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) -{ - std::lock_guard lock(in_flight_mutex_); - queries_in_flight = --num_in_flight_ > 0; - return is_shutdown_; -} - -bool rmw_client_data_t::is_shutdown() const -{ - std::lock_guard lock(in_flight_mutex_); - return is_shutdown_; -} - - -///============================================================================= -size_t rmw_client_data_t::get_next_sequence_number() -{ - std::lock_guard lock(sequence_number_mutex_); - return sequence_number_++; -} - -//============================================================================== -void client_data_handler(z_owned_reply_t * reply, void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (client_data->is_shutdown()) { - return; - } - - if (!z_reply_check(reply)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_check returned False" - ); - return; - } - if (!z_reply_is_ok(reply)) { - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(client_data->keyexpr)); - z_value_t err = z_reply_err(reply); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - z_loan(keystr), - (int)err.payload.len, - err.payload.start); - z_drop(z_move(keystr)); - - return; - } - - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); - - client_data->add_new_reply(std::make_unique(reply, received_timestamp)); - // Since we took ownership of the reply, null it out here - *reply = z_reply_null(); -} - -void client_data_drop(void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - bool queries_in_flight = false; - bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); - - if (is_shutdown) { - if (!queries_in_flight) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - client_data->context->options.allocator.deallocate( - client_data, client_data->context->options.allocator.state); - } - } -} - -} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp deleted file mode 100644 index 5605d7f0..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__RMW_DATA_TYPES_HPP_ -#define DETAIL__RMW_DATA_TYPES_HPP_ - -#include - -#include -#include -#include -#include -#include -#include - -#include "rcutils/allocator.h" - -#include "rmw/rmw.h" - -#include "rosidl_runtime_c/type_hash.h" - -#include "event.hpp" -#include "liveliness_utils.hpp" -#include "message_type_support.hpp" -#include "rmw_wait_set_data.hpp" -#include "service_type_support.hpp" -#include "zenoh_utils.hpp" - -/// Structs for various type erased data fields. -namespace rmw_zenoh_cpp -{ -///============================================================================= -void client_data_handler(z_owned_reply_t * reply, void * client_data); -void client_data_drop(void * data); - -///============================================================================= -class rmw_client_data_t final -{ -public: - // The Entity generated for the client. - std::shared_ptr entity; - - z_owned_keyexpr_t keyexpr; - - // Store the actual QoS profile used to configure this client. - // The QoS is reused for sending requests and getting responses. - rmw_qos_profile_t adapted_qos_profile; - - // Liveliness token for the client. - zc_owned_liveliness_token_t token; - - const void * request_type_support_impl; - const void * response_type_support_impl; - const char * typesupport_identifier; - const rosidl_type_hash_t * type_hash; - RequestTypeSupport * request_type_support; - ResponseTypeSupport * response_type_support; - - rmw_context_t * context; - - size_t get_next_sequence_number(); - - void add_new_reply(std::unique_ptr reply); - - bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - - bool detach_condition_and_queue_is_empty(); - - std::unique_ptr pop_next_reply(); - - DataCallbackManager data_callback_mgr; - - // See the comment for "num_in_flight" below on the use of this method. - void increment_in_flight_callbacks(); - - // See the comment for "num_in_flight" below on the use of this method. - bool shutdown_and_query_in_flight(); - - // See the comment for "num_in_flight" below on the use of this method. - bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); - - bool is_shutdown() const; - -private: - void notify(); - - size_t sequence_number_{1}; - std::mutex sequence_number_mutex_; - - rmw_wait_set_data_t * wait_set_data_{nullptr}; - std::mutex condition_mutex_; - - std::deque> reply_queue_; - mutable std::mutex reply_queue_mutex_; - - // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no - // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an - // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the - // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. - // - // The next 3 variables are used to avoid that situation. Any time a query is initiated via - // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the - // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ - // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. - // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. - // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query - // callback, the query callback will free up the structure. - // - // There is one case which is not handled by this, which has to do with timeouts. The query - // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never - // returns, the memory in this structure will never be freed. There isn't much we can do about - // that at this time, but we may want to consider changing the timeout so that the memory can - // eventually be freed up. - mutable std::mutex in_flight_mutex_; - bool is_shutdown_{false}; - size_t num_in_flight_{0}; -}; -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 19357c3d..bd3f3f6e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -319,6 +319,80 @@ void NodeData::delete_service_data(const rmw_service_t * const service) services_.erase(service); } + +///============================================================================= +bool NodeData::create_client_data( + const rmw_client_t * const client, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_supports, + const rmw_qos_profile_t * qos_profile) +{ + std::lock_guard lock_guard(mutex_); + if (is_shutdown_) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create ClientData as the NodeData has been shutdown."); + return false; + } + + if (clients_.count(client) > 0) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "ClientData already exists."); + return false; + } + + auto client_data = ClientData::make( + std::move(session), + node_, + client, + entity_->node_info(), + id_, + std::move(id), + std::move(service_name), + type_supports, + qos_profile); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make ClientData."); + return false; + } + + auto insertion = clients_.insert(std::make_pair(client, std::move(client_data))); + if (!insertion.second) { + return false; + } + return true; +} + +///============================================================================= +ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + auto it = clients_.find(client); + if (it == clients_.end()) { + return nullptr; + } + + return it->second; +} + +///============================================================================= +void NodeData::delete_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + auto client_it = clients_.find(client); + if (client_it == clients_.end()) { + return; + } + if (!client_it->second->shutdown_and_query_in_flight()) { + clients_.erase(client); + } +} + ///============================================================================= rmw_ret_t NodeData::shutdown() { @@ -365,6 +439,18 @@ rmw_ret_t NodeData::shutdown() ); } } + for (auto cli_it = clients_.begin(); cli_it != clients_.end(); ++cli_it) { + ret = cli_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown client %s within id %zu. rmw_ret_t code: %zu.", + cli_it->second->topic_info().name_.c_str(), + id_, + ret + ); + } + } // Unregister this node from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index c6c74e0d..f85b1366 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -25,6 +25,7 @@ #include "graph_cache.hpp" #include "liveliness_utils.hpp" +#include "rmw_client_data.hpp" #include "rmw_publisher_data.hpp" #include "rmw_subscription_data.hpp" #include "rmw_service_data.hpp" @@ -97,6 +98,21 @@ class NodeData final // Delete the ServiceData for a given rmw_service_t if present. void delete_service_data(const rmw_service_t * const service); + // Create a new ClientData for a given rmw_client_t. + bool create_client_data( + const rmw_client_t * const client, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Retrieve the ClientData for a given rmw_client_t if present. + ClientDataPtr get_client_data(const rmw_client_t * const client); + + // Delete the ClientData for a given rmw_client_t if present. + void delete_client_data(const rmw_client_t * const client); + // Shutdown this NodeData. rmw_ret_t shutdown(); @@ -132,6 +148,8 @@ class NodeData final std::unordered_map subs_; // Map of services. std::unordered_map services_; + // Map of clients. + std::unordered_map clients_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 9b2777c8..b2ea8235 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -421,10 +421,10 @@ liveliness::TopicInfo PublisherData::topic_info() const } ///============================================================================= -void PublisherData::copy_gid(rmw_gid_t * gid) const +void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { std::lock_guard lock(mutex_); - entity_->copy_gid(gid->data); + entity_->copy_gid(out_gid); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 2a5f9240..7b124cc3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -68,7 +68,7 @@ class PublisherData final liveliness::TopicInfo topic_info() const; // Copy the GID of this PublisherData into an rmw_gid_t. - void copy_gid(rmw_gid_t * gid) const; + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; // Returns true if liveliness token is still valid. bool liveliness_is_valid() const; diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 4fe298a0..63de9bcb 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -21,7 +21,6 @@ #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/rmw_publisher_data.hpp" #include "detail/rmw_subscription_data.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 29a81bd3..c61a44fa 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -22,7 +22,6 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "rcutils/env.h" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 761fbdcf..eebe1c4b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -36,7 +36,6 @@ #include "detail/message_type_support.hpp" #include "detail/qos.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" #include "detail/zenoh_utils.hpp" @@ -1363,21 +1362,28 @@ rmw_create_client( const rmw_qos_profile_t * qos_profile) { RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return nullptr); - + RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); if (strlen(service_name) == 0) { RMW_SET_ERROR_MSG("service name is empty string"); return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); - RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); - + // Validate service name + int validation_result; + if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { + RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + return nullptr; + } + if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + return nullptr; + } RMW_CHECK_FOR_NULL_WITH_MSG( node->context, "expected initialized context", @@ -1397,23 +1403,22 @@ rmw_create_client( return nullptr; } - rcutils_allocator_t * allocator = &node->context->options.allocator; - - // Validate service name - int validation_result; - - if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { - RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + // Get the service type support. + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_service_type_support return nullptr; } - if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + rcutils_allocator_t * allocator = &node->context->options.allocator; + if (!rcutils_allocator_is_valid(allocator)) { + RMW_SET_ERROR_MSG("allocator is invalid."); return nullptr; } - // client data - rmw_client_t * rmw_client = static_cast(allocator->zero_allocate( + // Create the rmw_client. + rmw_client_t * rmw_client = + static_cast(allocator->zero_allocate( 1, sizeof(rmw_client_t), allocator->state)); @@ -1421,226 +1426,46 @@ rmw_create_client( rmw_client, "failed to allocate memory for the client", return nullptr); - auto free_rmw_client = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(rmw_client, allocator->state); }); - auto client_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_client_data_t), allocator->state)); + auto node_data = context_impl->get_node_data(node); RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "failed to allocate memory for client data", + node_data, + "NodeData not found.", return nullptr); - auto free_client_data = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data, - client_data, - return nullptr, - rmw_zenoh_cpp::rmw_client_data_t, - ); - auto destruct_client_data = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->~rmw_client_data_t(), - rmw_zenoh_cpp::rmw_client_data_t); - }); - - // Adapt any 'best available' QoS options - client_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( - nullptr, nullptr, &client_data->adapted_qos_profile, nullptr); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); - return nullptr; - } - // Obtain the type support - const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); - if (type_support == nullptr) { - // error was already set by find_service_type_support + if (!node_data->create_client_data( + rmw_client, + context_impl->session(), + context_impl->get_next_entity_id(), + service_name, + type_support, + qos_profile)) + { + // Error already handled. return nullptr; } - auto service_members = static_cast(type_support->data); - auto request_members = static_cast( - service_members->request_members_->data); - auto response_members = static_cast( - service_members->response_members_->data); - - client_data->context = node->context; - client_data->typesupport_identifier = type_support->typesupport_identifier; - client_data->type_hash = type_support->get_type_hash_func(type_support); - client_data->request_type_support_impl = request_members; - client_data->response_type_support_impl = response_members; - - // Request type support - client_data->request_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::RequestTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->request_type_support, - "Failed to allocate rmw_zenoh_cpp::RequestTypeSupport", - return nullptr); - auto free_request_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->request_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->request_type_support, - client_data->request_type_support, - return nullptr, - rmw_zenoh_cpp::RequestTypeSupport, service_members); - auto destruct_request_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->request_type_support->~RequestTypeSupport(), - rmw_zenoh_cpp::RequestTypeSupport); - }); - - // Response type support - client_data->response_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::ResponseTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->response_type_support, - "Failed to allocate rmw_zenoh_cpp::ResponseTypeSupport", - return nullptr); - auto free_response_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->response_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->response_type_support, - client_data->response_type_support, - return nullptr, - rmw_zenoh_cpp::ResponseTypeSupport, service_members); - auto destruct_response_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->response_type_support->~ResponseTypeSupport(), - rmw_zenoh_cpp::ResponseTypeSupport); - }); - - // Populate the rmw_client. + // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased + // Client handle will be returned in the rmw_clients_t in rmw_wait + // from which we cannot obtain ClientData. + rmw_client->data = static_cast(node_data->get_client_data(rmw_client).get()); rmw_client->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_client->service_name = rcutils_strdup(service_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( rmw_client->service_name, - "failed to allocate client name", + "failed to allocate service client name", return nullptr); auto free_service_name = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); }); - // Note: Service request/response types will contain a suffix Request_ or Response_. - // We remove the suffix when appending the type to the liveliness tokens for - // better reusability within GraphCache. - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), rmw_client->service_name); - return nullptr; - } - - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - client_data->type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - z_session_t session = context_impl->session(); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "NodeData not found.", - return nullptr); - client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id()), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Client, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - node->context->actual_domain_id, - rmw_client->service_name, - std::move(service_type), - type_hash_c_str, - client_data->adapted_qos_profile} - ); - if (client_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the client %s.", - rmw_client->service_name); - return nullptr; - } - - client_data->keyexpr = z_keyexpr_new(client_data->entity->topic_info()->topic_keyexpr_.c_str()); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { - z_keyexpr_drop(z_move(client_data->keyexpr)); - }); - if (!z_keyexpr_check(&client_data->keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - - client_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(client_data->entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [client_data]() { - if (client_data != nullptr) { - z_drop(z_move(client_data->token)); - } - }); - if (!z_check(client_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the client."); - return nullptr; - } - - rmw_client->data = client_data; - - free_token.cancel(); free_rmw_client.cancel(); - free_client_data.cancel(); - destruct_request_type_support.cancel(); - free_request_type_support.cancel(); - destruct_response_type_support.cancel(); - free_response_type_support.cancel(); - destruct_client_data.cancel(); free_service_name.cancel(); - free_ros_keyexpr.cancel(); return rmw_client; } @@ -1650,10 +1475,12 @@ rmw_create_client( rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1664,36 +1491,15 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rcutils_allocator_t * allocator = &node->context->options.allocator; - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "client implementation pointer is null.", - return RMW_RET_INVALID_ARGUMENT); - - // CLEANUP =================================================================== - z_drop(z_move(client_data->keyexpr)); - zc_liveliness_undeclare_token(z_move(client_data->token)); - - RMW_TRY_DESTRUCTOR( - client_data->request_type_support->~RequestTypeSupport(), rmw_zenoh_cpp::RequestTypeSupport, ); - allocator->deallocate(client_data->request_type_support, allocator->state); - - RMW_TRY_DESTRUCTOR( - client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, - ); - allocator->deallocate(client_data->response_type_support, allocator->state); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (!client_data->shutdown_and_query_in_flight()) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + auto node_data = context_impl->get_node_data(node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; } + // Remove the ClientData from NodeData. + node_data->delete_client_data(client); + + rcutils_allocator_t * allocator = &node->context->options.allocator; allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -1710,115 +1516,23 @@ rmw_send_request( { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client_data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); - if (client_data->is_shutdown()) { - return RMW_RET_ERROR; - } - - rmw_context_impl_s * context_impl = static_cast( - client_data->context->impl); - - // Serialize data - - rcutils_allocator_t * allocator = &(client_data->context->options.allocator); - - size_t max_data_length = ( - client_data->request_type_support->get_estimated_serialized_size( - ros_request, client_data->request_type_support_impl)); - - // Init serialized message byte array - char * request_bytes = static_cast(allocator->allocate( - max_data_length, - allocator->state)); - if (!request_bytes) { - RMW_SET_ERROR_MSG("failed allocate request message bytes"); - return RMW_RET_ERROR; - } - auto free_request_bytes = rcpputils::make_scope_exit( - [request_bytes, allocator]() { - allocator->deallocate(request_bytes, allocator->state); - }); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!client_data->request_type_support->serialize_ros_message( - ros_request, - ser.get_cdr(), - client_data->request_type_support_impl)) - { - return RMW_RET_ERROR; - } - - size_t data_length = ser.get_serialized_data_length(); - - *sequence_id = client_data->get_next_sequence_number(); - - // Send request - z_get_options_t opts = z_get_options_default(); - - z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( - *sequence_id, - [client_data](z_owned_bytes_map_t * map, const char * key) - { - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - client_data->entity->copy_gid(local_gid); - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = local_gid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - client_data->increment_in_flight_callbacks(); - - opts.attachment = z_bytes_map_as_attachment(&map); - - opts.target = Z_QUERY_TARGET_ALL_COMPLETE; - // The default timeout for a z_get query is 10 seconds and if a response is not received within - // this window, the queryable will return an invalid reply. However, it is common for actions, - // which are implemented using services, to take an extended duration to complete. Hence, we set - // the timeout_ms to the largest supported value to account for most realistic scenarios. - opts.timeout_ms = std::numeric_limits::max(); - // Latest consolidation guarantees unicity of replies for the same key expression, - // which optimizes bandwidth. The default is "None", which imples replies may come in any order - // and any number. - opts.consolidation = z_query_consolidation_latest(); - opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; - z_owned_closure_reply_t zn_closure_reply = - z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); - z_get( - context_impl->session(), - z_loan(client_data->keyexpr), "", - z_move(zn_closure_reply), - &opts); - - return RMW_RET_OK; + return client_data->send_request(ros_request, sequence_id); } //============================================================================== @@ -1830,12 +1544,10 @@ rmw_take_response( void * ros_response, bool * taken) { + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); *taken = false; - RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, @@ -1843,69 +1555,13 @@ rmw_take_response( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_FOR_NULL_WITH_MSG( client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = client_data->pop_next_reply(); - if (latest_reply == nullptr) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - std::optional sample = latest_reply->get_sample(); - if (!sample) { - RMW_SET_ERROR_MSG("invalid reply sample"); - return RMW_RET_ERROR; - } - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(sample->payload.start)), - sample->payload.len); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!client_data->response_type_support->deserialize_ros_message( - deser.get_cdr(), - ros_response, - client_data->response_type_support_impl)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS response"); - return RMW_RET_ERROR; - } - - // Fill in the request_header - - request_header->request_id.sequence_number = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "sequence_number"); - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } - - request_header->source_timestamp = - rmw_zenoh_cpp::get_int64_from_attachment(&sample->attachment, "source_timestamp"); - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); - return RMW_RET_ERROR; - } - - if (!rmw_zenoh_cpp::get_gid_from_attachment( - &sample->attachment, - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client gid from attachment"); - return RMW_RET_ERROR; - } - - request_header->received_timestamp = latest_reply->get_received_timestamp(); - - *taken = true; - - return RMW_RET_OK; + return client_data->take_response(request_header, ros_response, taken); } //============================================================================== @@ -1922,12 +1578,11 @@ rmw_client_request_publisher_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - *qos = client_data->adapted_qos_profile; + *qos = client_data->topic_info().qos_; return RMW_RET_OK; } @@ -1951,7 +1606,6 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profile) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, @@ -2004,7 +1658,6 @@ rmw_create_service( return nullptr; } - // SERVICE DATA ============================================================== rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { RMW_SET_ERROR_MSG("allocator is invalid."); @@ -2045,7 +1698,7 @@ rmw_create_service( } // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased - // Service handle will be returned in the rmw_service_t in rmw_wait + // Service handle will be returned in the rmw_services_t in rmw_wait // from which we cannot obtain ServiceData. rmw_service->data = static_cast(node_data->get_service_data(rmw_service).get()); rmw_service->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; @@ -2436,8 +2089,8 @@ check_and_attach_condition( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2597,8 +2250,8 @@ rmw_wait( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2798,7 +2451,7 @@ rmw_get_gid_for_publisher(const rmw_publisher_t * publisher, rmw_gid_t * gid) RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - pub_data->copy_gid(gid); + pub_data->copy_gid(gid->data); return RMW_RET_OK; } @@ -2810,12 +2463,12 @@ rmw_get_gid_for_client(const rmw_client_t * client, rmw_gid_t * gid) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(gid, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - client_data->entity->copy_gid(gid->data); + client_data->copy_gid(gid->data); return RMW_RET_OK; } @@ -2861,29 +2514,16 @@ rmw_service_server_is_available( RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(is_available, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); if (client_data == nullptr) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( "Unable to retreive client_data from client for service %s", client->service_name); return RMW_RET_INVALID_ARGUMENT; } - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), client->service_name); - return RMW_RET_INVALID_ARGUMENT; - } - return node->context->impl->graph_cache()->service_server_is_available( - client->service_name, service_type.c_str(), is_available); + client_data->topic_info(), is_available); } //============================================================================== @@ -2956,11 +2596,11 @@ rmw_client_set_on_new_response_callback( const void * user_data) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - client_data->data_callback_mgr.set_callback( - user_data, std::move(callback)); + client_data->set_on_new_response_callback( + std::move(callback), user_data); return RMW_RET_OK; } } // extern "C"