From ca899acfaf66a7abee69f7e656d71a855c38992d Mon Sep 17 00:00:00 2001 From: yadunund Date: Thu, 10 Oct 2024 14:32:41 -0700 Subject: [PATCH] Make ServiceData thread-safe (#292) * Make ServiceData thread-safe Signed-off-by: Yadunund * Feedback Signed-off-by: Yadunund * check for shutdown Signed-off-by: Yadunund * Remove redundant undeclare token Signed-off-by: Yadunund * Fix docstring Signed-off-by: Yadunund --------- Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 1 + rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 197 ------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 93 +-- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 78 +++ rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 32 +- .../src/detail/rmw_publisher_data.cpp | 9 +- .../src/detail/rmw_publisher_data.hpp | 3 - rmw_zenoh_cpp/src/detail/rmw_service_data.cpp | 535 ++++++++++++++++++ rmw_zenoh_cpp/src/detail/rmw_service_data.hpp | 140 +++++ .../src/detail/rmw_subscription_data.cpp | 7 +- .../src/detail/rmw_subscription_data.hpp | 2 +- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 40 ++ rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 30 + rmw_zenoh_cpp/src/rmw_zenoh.cpp | 480 +++------------- 14 files changed, 926 insertions(+), 721 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_service_data.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_service_data.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 508d5415..1442541b 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -42,6 +42,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/rmw_data_types.cpp src/detail/rmw_publisher_data.cpp src/detail/rmw_node_data.cpp + src/detail/rmw_service_data.cpp src/detail/rmw_subscription_data.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 0489c261..c1e56bfd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -16,9 +16,6 @@ #include #include #include -#include -#include -#include #include #include "liveliness_utils.hpp" @@ -35,137 +32,6 @@ ///============================================================================= namespace rmw_zenoh_cpp { -///============================================================================= -bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) -{ - std::lock_guard lock(condition_mutex_); - if (!query_queue_.empty()) { - return true; - } - wait_set_data_ = wait_set_data; - - return false; -} - -///============================================================================= -bool rmw_service_data_t::detach_condition_and_queue_is_empty() -{ - std::lock_guard lock(condition_mutex_); - wait_set_data_ = nullptr; - - return query_queue_.empty(); -} - -///============================================================================= -std::unique_ptr rmw_service_data_t::pop_next_query() -{ - std::lock_guard lock(query_queue_mutex_); - if (query_queue_.empty()) { - return nullptr; - } - - std::unique_ptr query = std::move(query_queue_.front()); - query_queue_.pop_front(); - - return query; -} - -///============================================================================= -void rmw_service_data_t::notify() -{ - std::lock_guard lock(condition_mutex_); - if (wait_set_data_ != nullptr) { - std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); - wait_set_data_->triggered = true; - wait_set_data_->condition_variable.notify_one(); - } -} - -///============================================================================= -void rmw_service_data_t::add_new_query(std::unique_ptr query) -{ - std::lock_guard lock(query_queue_mutex_); - if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && - query_queue_.size() >= adapted_qos_profile.depth) - { - // Log warning if message is discarded due to hitting the queue depth - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Query queue depth of %ld reached, discarding oldest Query " - "for service for %s", - adapted_qos_profile.depth, - z_loan(keystr)); - z_drop(z_move(keystr)); - query_queue_.pop_front(); - } - query_queue_.emplace_back(std::move(query)); - - // Since we added new data, trigger user callback and guard condition if they are available - data_callback_mgr.trigger_callback(); - notify(); -} - -///============================================================================= -bool rmw_service_data_t::add_to_query_map( - const rmw_request_id_t & request_id, std::unique_ptr query) -{ - const size_t hash = rmw_zenoh_cpp::hash_gid(request_id.writer_guid); - - std::lock_guard lock(sequence_to_query_map_mutex_); - - std::unordered_map::iterator it = sequence_to_query_map_.find(hash); - - if (it == sequence_to_query_map_.end()) { - SequenceToQuery stq; - - sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq))); - - it = sequence_to_query_map_.find(hash); - } else { - // Client already in the map - - if (it->second.find(request_id.sequence_number) != it->second.end()) { - return false; - } - } - - it->second.insert(std::make_pair(request_id.sequence_number, std::move(query))); - - return true; -} - -///============================================================================= -std::unique_ptr rmw_service_data_t::take_from_query_map( - const rmw_request_id_t & request_id) -{ - const size_t hash = rmw_zenoh_cpp::hash_gid(request_id.writer_guid); - - std::lock_guard lock(sequence_to_query_map_mutex_); - - std::unordered_map::iterator it = sequence_to_query_map_.find(hash); - - if (it == sequence_to_query_map_.end()) { - return nullptr; - } - - SequenceToQuery::iterator query_it = it->second.find(request_id.sequence_number); - - if (query_it == it->second.end()) { - return nullptr; - } - - std::unique_ptr query = std::move(query_it->second); - it->second.erase(query_it); - - if (sequence_to_query_map_[hash].size() == 0) { - sequence_to_query_map_.erase(hash); - } - - return query; -} - ///============================================================================= void rmw_client_data_t::notify() { @@ -275,69 +141,6 @@ bool rmw_client_data_t::is_shutdown() const return is_shutdown_; } -///============================================================================= -ZenohQuery::ZenohQuery(const z_query_t * query) -{ - query_ = z_query_clone(query); -} - -///============================================================================= -ZenohQuery::~ZenohQuery() -{ - z_drop(z_move(query_)); -} - -///============================================================================= -const z_query_t ZenohQuery::get_query() const -{ - return z_query_loan(&query_); -} - -//============================================================================== -void service_data_handler(const z_query_t * query, void * data) -{ - z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); - auto drop_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); - - rmw_service_data_t * service_data = - static_cast(data); - if (service_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain rmw_service_data_t from data for " - "service for %s", - z_loan(keystr) - ); - return; - } - - service_data->add_new_query(std::make_unique(query)); -} - -///============================================================================= -ZenohReply::ZenohReply(const z_owned_reply_t * reply) -{ - reply_ = *reply; -} - -///============================================================================= -ZenohReply::~ZenohReply() -{ - z_reply_drop(z_move(reply_)); -} - -///============================================================================= -std::optional ZenohReply::get_sample() const -{ - if (z_reply_is_ok(&reply_)) { - return z_reply_ok(&reply_); - } - - return std::nullopt; -} ///============================================================================= size_t rmw_client_data_t::get_next_sequence_number() diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index c5bfbe4f..d0465a07 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -21,11 +21,8 @@ #include #include #include -#include #include -#include #include -#include #include "rcutils/allocator.h" @@ -34,106 +31,18 @@ #include "rosidl_runtime_c/type_hash.h" #include "event.hpp" -#include "graph_cache.hpp" #include "message_type_support.hpp" #include "rmw_wait_set_data.hpp" #include "service_type_support.hpp" +#include "zenoh_utils.hpp" /// Structs for various type erased data fields. - namespace rmw_zenoh_cpp { -///============================================================================= -void service_data_handler(const z_query_t * query, void * service_data); - ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * client_data); void client_data_drop(void * data); -///============================================================================= -class ZenohQuery final -{ -public: - ZenohQuery(const z_query_t * query); - - ~ZenohQuery(); - - const z_query_t get_query() const; - -private: - z_owned_query_t query_; -}; - -///============================================================================= -class rmw_service_data_t final -{ -public: - // The Entity generated for the service. - std::shared_ptr entity; - - z_owned_keyexpr_t keyexpr; - z_owned_queryable_t qable; - - // Store the actual QoS profile used to configure this service. - // The QoS is reused for getting requests and sending responses. - rmw_qos_profile_t adapted_qos_profile; - - // Liveliness token for the service. - zc_owned_liveliness_token_t token; - - const void * request_type_support_impl; - const void * response_type_support_impl; - const char * typesupport_identifier; - const rosidl_type_hash_t * type_hash; - RequestTypeSupport * request_type_support; - ResponseTypeSupport * response_type_support; - - rmw_context_t * context; - - bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - - bool detach_condition_and_queue_is_empty(); - - std::unique_ptr pop_next_query(); - - void add_new_query(std::unique_ptr query); - - bool add_to_query_map(const rmw_request_id_t & request_id, std::unique_ptr query); - - std::unique_ptr take_from_query_map(const rmw_request_id_t & request_id); - - DataCallbackManager data_callback_mgr; - -private: - void notify(); - - // Deque to store the queries in the order they arrive. - std::deque> query_queue_; - mutable std::mutex query_queue_mutex_; - - // Map to store the sequence_number (as given by the client) -> ZenohQuery - using SequenceToQuery = std::unordered_map>; - std::unordered_map sequence_to_query_map_; - std::mutex sequence_to_query_map_mutex_; - - rmw_wait_set_data_t * wait_set_data_{nullptr}; - std::mutex condition_mutex_; -}; - -///============================================================================= -class ZenohReply final -{ -public: - ZenohReply(const z_owned_reply_t * reply); - - ~ZenohReply(); - - std::optional get_sample() const; - -private: - z_owned_reply_t reply_; -}; - ///============================================================================= class rmw_client_data_t final { diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 88cb0d3e..19357c3d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -253,6 +253,72 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) subs_.erase(subscription); } +///============================================================================= +bool NodeData::create_service_data( + const rmw_service_t * const service, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_supports, + const rmw_qos_profile_t * qos_profile) +{ + std::lock_guard lock_guard(mutex_); + if (is_shutdown_) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create ServiceData as the NodeData has been shutdown."); + return false; + } + + if (services_.count(service) > 0) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "ServiceData already exists."); + return false; + } + + auto service_data = ServiceData::make( + std::move(session), + node_, + entity_->node_info(), + id_, + std::move(id), + std::move(service_name), + type_supports, + qos_profile); + if (service_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make ServiceData."); + return false; + } + + auto insertion = services_.insert(std::make_pair(service, std::move(service_data))); + if (!insertion.second) { + return false; + } + return true; +} + +///============================================================================= +ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service) +{ + std::lock_guard lock_guard(mutex_); + auto it = services_.find(service); + if (it == services_.end()) { + return nullptr; + } + + return it->second; +} + +///============================================================================= +void NodeData::delete_service_data(const rmw_service_t * const service) +{ + std::lock_guard lock_guard(mutex_); + services_.erase(service); +} + ///============================================================================= rmw_ret_t NodeData::shutdown() { @@ -287,6 +353,18 @@ rmw_ret_t NodeData::shutdown() ); } } + for (auto srv_it = services_.begin(); srv_it != services_.end(); ++srv_it) { + ret = srv_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown service %s within id %zu. rmw_ret_t code: %zu.", + srv_it->second->topic_info().name_.c_str(), + id_, + ret + ); + } + } // Unregister this node from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index cabde5e5..c6c74e0d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -27,6 +27,7 @@ #include "liveliness_utils.hpp" #include "rmw_publisher_data.hpp" #include "rmw_subscription_data.hpp" +#include "rmw_service_data.hpp" #include "rmw/rmw.h" @@ -50,7 +51,7 @@ class NodeData final // Get the id of this node. std::size_t id() const; - // Create a new PublisherData for a publisher in this node. + // Create a new PublisherData for a given rmw_publisher_t. bool create_pub_data( const rmw_publisher_t * const publisher, z_session_t session, @@ -59,15 +60,15 @@ class NodeData final const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile); - /// Retrieve the PublisherData for a given rmw_publisher_t if present. + // Retrieve the PublisherData for a given rmw_publisher_t if present. PublisherDataPtr get_pub_data(const rmw_publisher_t * const publisher); // Delete the PublisherData for a given rmw_publisher_t if present. void delete_pub_data(const rmw_publisher_t * const publisher); - // Create a new SubscriptionData for a publisher in this node. + // Create a new SubscriptionData for a given rmw_subscription_t. bool create_sub_data( - const rmw_subscription_t * const publisher, + const rmw_subscription_t * const subscription, z_session_t session, std::shared_ptr graph_cache, std::size_t id, @@ -75,11 +76,26 @@ class NodeData final const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile); - /// Retrieve the SubscriptionData for a given rmw_subscription_t if present. - SubscriptionDataPtr get_sub_data(const rmw_subscription_t * const publisher); + // Retrieve the SubscriptionData for a given rmw_subscription_t if present. + SubscriptionDataPtr get_sub_data(const rmw_subscription_t * const subscription); // Delete the SubscriptionData for a given rmw_subscription_t if present. - void delete_sub_data(const rmw_subscription_t * const publisher); + void delete_sub_data(const rmw_subscription_t * const subscription); + + // Create a new ServiceData for a given rmw_service_t. + bool create_service_data( + const rmw_service_t * const service, + z_session_t session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Retrieve the ServiceData for a given rmw_service_t if present. + ServiceDataPtr get_service_data(const rmw_service_t * const service); + + // Delete the ServiceData for a given rmw_service_t if present. + void delete_service_data(const rmw_service_t * const service); // Shutdown this NodeData. rmw_ret_t shutdown(); @@ -114,6 +130,8 @@ class NodeData final std::unordered_map pubs_; // Map of subscriptions. std::unordered_map subs_; + // Map of services. + std::unordered_map services_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 91aa9127..2ad88f51 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -222,13 +222,6 @@ PublisherData::PublisherData( events_mgr_ = std::make_shared(); } -///============================================================================= -rmw_qos_profile_t PublisherData::adapted_qos_profile() const -{ - std::lock_guard lock(mutex_); - return entity_->topic_info()->qos_; -} - ///============================================================================= rmw_ret_t PublisherData::publish( const void * ros_message, @@ -465,7 +458,7 @@ rmw_ret_t PublisherData::shutdown() return RMW_RET_OK; } - // Unregister this node from the ROS graph. + // Unregister this publisher from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); if (pub_cache_.has_value()) { z_drop(z_move(pub_cache_.value())); diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 2ca66e51..0faa53c7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -51,9 +51,6 @@ class PublisherData final const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile); - // Get a copy of the actual qos_profile used by this publisher. - rmw_qos_profile_t adapted_qos_profile() const; - // Publish a ROS message. rmw_ret_t publish( const void * ros_message, diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp new file mode 100644 index 00000000..50fe9357 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -0,0 +1,535 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_service_data.hpp" + +#include + +#include +#include +#include +#include +#include +#include + +#include "attachment_helpers.hpp" +#include "cdr.hpp" +#include "rmw_context_impl_s.hpp" +#include "message_type_support.hpp" +#include "logging_macros.hpp" +#include "qos.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================== +void service_data_handler(const z_query_t * query, void * data) +{ + z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); + auto drop_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); + + ServiceData * service_data = + static_cast(data); + if (service_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain ServiceData from data for " + "service for %s", + z_loan(keystr) + ); + return; + } + + service_data->add_new_query(std::make_unique(query)); +} + +///============================================================================= +std::shared_ptr ServiceData::make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + // Adapt any 'best available' QoS options + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + nullptr, nullptr, &adapted_qos_profile, nullptr); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + auto request_type_support = std::make_unique(service_members); + auto response_type_support = std::make_unique(service_members); + + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = response_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Response_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for service %s. Report this bug", + service_type.c_str(), service_name.c_str()); + return nullptr; + } + + // Convert the type hash to a string so that it can be included in the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + std::size_t domain_id = node_info.domain_id_; + auto entity = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(service_id), + liveliness::EntityType::Service, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + service_name, + std::move(service_type), + type_hash_c_str, + std::move(adapted_qos_profile)} + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the service %s.", + service_name); + return nullptr; + } + + auto service_data = std::shared_ptr( + new ServiceData{ + node, + std::move(entity), + request_members, + response_members, + std::move(request_type_support), + std::move(response_type_support) + }); + + // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr + // in the closure callback once we switch to zenoh-cpp. + z_owned_closure_query_t callback = z_closure(service_data_handler, nullptr, service_data.get()); + service_data->keyexpr_ = + z_keyexpr_new(service_data->entity_->topic_info().value().topic_keyexpr_.c_str()); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [service_data]() { + z_drop(z_move(service_data->keyexpr_)); + }); + if (!z_check(z_loan(service_data->keyexpr_))) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + // Configure the queryable to process complete queries. + z_queryable_options_t qable_options = z_queryable_options_default(); + qable_options.complete = true; + service_data->qable_ = z_declare_queryable( + session, + z_loan(service_data->keyexpr_), + z_move(callback), + &qable_options); + auto undeclare_z_queryable = rcpputils::make_scope_exit( + [service_data]() { + z_undeclare_queryable(z_move(service_data->qable_)); + }); + if (!z_check(service_data->qable_)) { + RMW_SET_ERROR_MSG("unable to create zenoh queryable"); + return nullptr; + } + + service_data->token_ = zc_liveliness_declare_token( + session, + z_keyexpr(service_data->entity_->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [service_data]() { + z_drop(z_move(service_data->token_)); + }); + if (!z_check(service_data->token_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the service."); + return nullptr; + } + + free_ros_keyexpr.cancel(); + undeclare_z_queryable.cancel(); + free_token.cancel(); + + return service_data; +} + +///============================================================================= +ServiceData::ServiceData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::unique_ptr request_type_support, + std::unique_ptr response_type_support) +: rmw_node_(rmw_node), + entity_(std::move(entity)), + request_type_support_impl_(request_type_support_impl), + response_type_support_impl_(response_type_support_impl), + request_type_support_(std::move(request_type_support)), + response_type_support_(std::move(response_type_support)), + wait_set_data_(nullptr), + is_shutdown_(false) +{ + // Do nothing. +} + +///============================================================================= +liveliness::TopicInfo ServiceData::topic_info() const +{ + std::lock_guard lock(mutex_); + return entity_->topic_info().value(); +} + +///============================================================================= +bool ServiceData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + return zc_liveliness_token_check(&token_); +} + +///============================================================================= +void ServiceData::add_new_query(std::unique_ptr query) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Request from client will be ignored since the service is shutdown." + ); + return; + } + const rmw_qos_profile_t adapted_qos_profile = + entity_->topic_info().value().qos_; + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + query_queue_.size() >= adapted_qos_profile.depth) + { + // Log warning if message is discarded due to hitting the queue depth + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(keyexpr_)); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Query queue depth of %ld reached, discarding oldest Query " + "for service for %s", + adapted_qos_profile.depth, + z_loan(keystr)); + z_drop(z_move(keystr)); + query_queue_.pop_front(); + } + query_queue_.emplace_back(std::move(query)); + + // Since we added new data, trigger user callback and guard condition if they are available + data_callback_mgr_.trigger_callback(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); + } +} + +///============================================================================= +rmw_ret_t ServiceData::take_request( + rmw_service_info_t * request_header, + void * ros_request, + bool * taken) +{ + std::lock_guard lock(mutex_); + *taken = false; + + if (is_shutdown_ || query_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + std::unique_ptr query = std::move(query_queue_.front()); + query_queue_.pop_front(); + const z_query_t loaned_query = query->get_query(); + + // DESERIALIZE MESSAGE ======================================================== + z_value_t payload_value = z_query_value(&loaned_query); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(payload_value.payload.start)), + payload_value.payload.len); + + // Object that serializes the data + Cdr deser(fastbuffer); + if (!request_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_request, + request_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS message"); + return RMW_RET_ERROR; + } + + // Fill in the request header. + // Get the sequence_number out of the attachment + z_attachment_t attachment = z_query_attachment(&loaned_query); + request_header->request_id.sequence_number = + get_int64_from_attachment(&attachment, "sequence_number"); + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); + return RMW_RET_ERROR; + } + request_header->source_timestamp = get_int64_from_attachment( + &attachment, + "source_timestamp"); + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + if (!get_gid_from_attachment( + &attachment, + request_header->request_id.writer_guid)) + { + RMW_SET_ERROR_MSG("Could not get client GID from attachment"); + return RMW_RET_ERROR; + } + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + request_header->received_timestamp = now_ns.count(); + + // Add this query to the map, so that rmw_send_response can quickly look it up later. + const size_t hash = rmw_zenoh_cpp::hash_gid(request_header->request_id.writer_guid); + std::unordered_map::iterator it = sequence_to_query_map_.find(hash); + if (it == sequence_to_query_map_.end()) { + SequenceToQuery stq; + sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq))); + it = sequence_to_query_map_.find(hash); + } else { + // Client already in the map + if (it->second.find(request_header->request_id.sequence_number) != it->second.end()) { + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; + } + } + + it->second.insert(std::make_pair(request_header->request_id.sequence_number, std::move(query))); + *taken = true; + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t ServiceData::send_response( + rmw_request_id_t * request_id, + void * ros_response) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Unable to send response as the service is shutdown." + ); + return RMW_RET_OK; + } + // Create the queryable payload + const size_t hash = hash_gid(request_id->writer_guid); + std::unordered_map::iterator it = sequence_to_query_map_.find(hash); + if (it == sequence_to_query_map_.end()) { + // If there is no data associated with this request, the higher layers of + // ROS 2 seem to expect that we just silently return with no work. + return RMW_RET_OK; + } + SequenceToQuery::iterator query_it = it->second.find(request_id->sequence_number); + if (query_it == it->second.end()) { + // If there is no data associated with this request, the higher layers of + // ROS 2 seem to expect that we just silently return with no work. + return RMW_RET_OK; + } + std::unique_ptr query = std::move(query_it->second); + it->second.erase(query_it); + if (sequence_to_query_map_[hash].size() == 0) { + sequence_to_query_map_.erase(hash); + } + + rcutils_allocator_t * allocator = &(rmw_node_->context->options.allocator); + + size_t max_data_length = ( + response_type_support_->get_estimated_serialized_size( + ros_response, response_type_support_impl_)); + + // Init serialized message byte array + char * response_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!response_bytes) { + RMW_SET_ERROR_MSG("failed to allocate response message bytes"); + return RMW_RET_ERROR; + } + auto free_response_bytes = rcpputils::make_scope_exit( + [response_bytes, allocator]() { + allocator->deallocate(response_bytes, allocator->state); + }); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(response_bytes, max_data_length); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr ser(fastbuffer); + if (!response_type_support_->serialize_ros_message( + ros_response, + ser.get_cdr(), + response_type_support_impl_)) + { + return RMW_RET_ERROR; + } + + size_t data_length = ser.get_serialized_data_length(); + const z_query_t loaned_query = query->get_query(); + z_query_reply_options_t options = z_query_reply_options_default(); + z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( + request_id->sequence_number, + [request_id](z_owned_bytes_map_t * map, const char * key) + { + z_bytes_t gid_bytes; + gid_bytes.len = RMW_GID_STORAGE_SIZE; + gid_bytes.start = request_id->writer_guid; + z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); + }); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + options.attachment = z_bytes_map_as_attachment(&map); + + z_query_reply( + &loaned_query, z_loan(keyexpr_), reinterpret_cast( + response_bytes), data_length, &options); + + return RMW_RET_OK; +} + +///============================================================================= +ServiceData::~ServiceData() +{ + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing service /%s.", + entity_->topic_info().value().name_.c_str() + ); + } +} + +//============================================================================== +void ServiceData::set_on_new_request_callback( + rmw_event_callback_t callback, + const void * user_data) +{ + std::lock_guard lock(mutex_); + data_callback_mgr_.set_callback(user_data, std::move(callback)); +} + +///============================================================================= +bool ServiceData::queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data) +{ + std::lock_guard lock(mutex_); + if (!query_queue_.empty()) { + return true; + } + wait_set_data_ = wait_set_data; + + return false; +} + +///============================================================================= +bool ServiceData::detach_condition_and_queue_is_empty() +{ + std::lock_guard lock(mutex_); + wait_set_data_ = nullptr; + + return query_queue_.empty(); +} + +///============================================================================= +rmw_ret_t ServiceData::shutdown() +{ + rmw_ret_t ret = RMW_RET_OK; + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return ret; + } + + // Unregister this node from the ROS graph. + if (zc_liveliness_token_check(&token_)) { + zc_liveliness_undeclare_token(z_move(token_)); + } + if (z_check(z_loan(keyexpr_))) { + z_drop(z_move(keyexpr_)); + } + if (z_check(qable_)) { + z_undeclare_queryable(z_move(qable_)); + } + + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +bool ServiceData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp new file mode 100644 index 00000000..257cc24c --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -0,0 +1,140 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_SERVICE_DATA_HPP_ +#define DETAIL__RMW_SERVICE_DATA_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.hpp" +#include "liveliness_utils.hpp" +#include "message_type_support.hpp" +#include "service_type_support.hpp" +#include "type_support_common.hpp" +#include "zenoh_utils.hpp" + +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" +#include "rmw/ret_types.h" + +namespace rmw_zenoh_cpp +{ + +///============================================================================= +class ServiceData final +{ +public: + // Make a shared_ptr of ServiceData. + static std::shared_ptr make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Get a copy of the TopicInfo of this ServiceData. + liveliness::TopicInfo topic_info() const; + + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + + // Add a new ZenohQuery to the queue. + void add_new_query(std::unique_ptr query); + + // Take a ROS service request. + rmw_ret_t take_request( + rmw_service_info_t * request_header, + void * ros_request, + bool * taken); + + // Send a service response. + rmw_ret_t send_response( + rmw_request_id_t * request_id, + void * ros_response); + + void set_on_new_request_callback( + rmw_event_callback_t callback, + const void * user_data); + + bool queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data); + + bool detach_condition_and_queue_is_empty(); + + // Shutdown this ServiceData. + rmw_ret_t shutdown(); + + // Check if this ServiceData is shutdown. + bool is_shutdown() const; + + // Destructor. + ~ServiceData(); + +private: + // Constructor. + ServiceData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::unique_ptr request_type_support, + std::unique_ptr response_type_support); + + // Internal mutex. + mutable std::mutex mutex_; + // The parent node. + const rmw_node_t * rmw_node_; + // The Entity generated for the service. + std::shared_ptr entity_; + // An owned keyexpression. + z_owned_keyexpr_t keyexpr_; + // An owned queryable. + z_owned_queryable_t qable_; + // Liveliness token for the service. + zc_owned_liveliness_token_t token_; + // Type support fields. + const void * request_type_support_impl_; + const void * response_type_support_impl_; + std::unique_ptr request_type_support_; + std::unique_ptr response_type_support_; + // Deque to store the queries in the order they arrive. + std::deque> query_queue_; + // Map to store the sequence_number (as given by the client) -> ZenohQuery + using SequenceToQuery = std::unordered_map>; + std::unordered_map sequence_to_query_map_; + // Wait set data. + rmw_wait_set_data_t * wait_set_data_; + // Data callback manager. + DataCallbackManager data_callback_mgr_; + // Shutdown flag. + bool is_shutdown_; +}; +using ServiceDataPtr = std::shared_ptr; +using ServiceDataConstPtr = std::shared_ptr; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_SERVICE_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 7fc4154f..5e6c945d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -141,8 +141,7 @@ std::shared_ptr SubscriptionData::make( auto callbacks = static_cast(type_support->data); auto message_type_support = std::make_unique(callbacks); - // Convert the type hash to a string so that it can be included in - // the keyexpr. + // Convert the type hash to a string so that it can be included in the keyexpr. char * type_hash_c_str = nullptr; rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( type_hash, @@ -408,7 +407,7 @@ rmw_ret_t SubscriptionData::shutdown() return ret; } - // Unregister this node from the ROS graph. + // Unregister this subscription from the ROS graph. if (zc_liveliness_token_check(&token_)) { zc_liveliness_undeclare_token(z_move(token_)); } @@ -621,7 +620,7 @@ void SubscriptionData::set_on_new_message_callback( const void * user_data) { std::lock_guard lock(mutex_); - data_callback_mgr_.set_callback(user_data, callback); + data_callback_mgr_.set_callback(user_data, std::move(callback)); } //============================================================================== diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 0d461e49..2bfc2c93 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -153,7 +153,7 @@ class SubscriptionData final size_t total_messages_lost_; // Wait set data. rmw_wait_set_data_t * wait_set_data_; - // std::mutex condition_mutex_; + // Callback managers. DataCallbackManager data_callback_mgr_; std::shared_ptr events_mgr_; // Shutdown flag. diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 79586033..db07d01e 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -62,4 +62,44 @@ create_map_and_set_sequence_num( return map; } + +///============================================================================= +ZenohQuery::ZenohQuery(const z_query_t * query) +{ + query_ = z_query_clone(query); +} + +///============================================================================= +ZenohQuery::~ZenohQuery() +{ + z_drop(z_move(query_)); +} + +///============================================================================= +const z_query_t ZenohQuery::get_query() const +{ + return z_query_loan(&query_); +} + +///============================================================================= +ZenohReply::ZenohReply(const z_owned_reply_t * reply) +{ + reply_ = *reply; +} + +///============================================================================= +ZenohReply::~ZenohReply() +{ + z_reply_drop(z_move(reply_)); +} + +///============================================================================= +std::optional ZenohReply::get_sample() const +{ + if (z_reply_is_ok(&reply_)) { + return z_reply_ok(&reply_); + } + + return std::nullopt; +} } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index ca0ff2a9..c57fc87e 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -18,6 +18,7 @@ #include #include +#include #include "rmw/types.h" @@ -31,6 +32,35 @@ using GIDCopier = std::function; z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_number, GIDCopier gid_copier); +///============================================================================= +// A class to store the replies to service requests. +class ZenohReply final +{ +public: + ZenohReply(const z_owned_reply_t * reply); + + ~ZenohReply(); + + std::optional get_sample() const; + +private: + z_owned_reply_t reply_; +}; + +// A class to store the queries made by clients. +///============================================================================= +class ZenohQuery final +{ +public: + ZenohQuery(const z_query_t * query); + + ~ZenohQuery(); + + const z_query_t get_query() const; + +private: + z_owned_query_t query_; +}; } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index a616ff00..4b2b4f0c 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -678,7 +678,7 @@ rmw_publisher_get_actual_qos( auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - *qos = pub_data->adapted_qos_profile(); + *qos = pub_data->topic_info().qos_; return RMW_RET_OK; } @@ -2002,10 +2002,23 @@ rmw_create_service( return nullptr; } + // Get the RMW type support. + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_service_type_support + return nullptr; + } + // SERVICE DATA ============================================================== rcutils_allocator_t * allocator = &node->context->options.allocator; + if (!rcutils_allocator_is_valid(allocator)) { + RMW_SET_ERROR_MSG("allocator is invalid."); + return nullptr; + } - rmw_service_t * rmw_service = static_cast(allocator->zero_allocate( + // Create the rmw_service. + rmw_service_t * rmw_service = + static_cast(allocator->zero_allocate( 1, sizeof(rmw_service_t), allocator->state)); @@ -2018,102 +2031,28 @@ rmw_create_service( allocator->deallocate(rmw_service, allocator->state); }); - auto service_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_service_data_t), allocator->state)); + auto node_data = context_impl->get_node_data(node); RMW_CHECK_FOR_NULL_WITH_MSG( - service_data, - "failed to allocate memory for service data", + node_data, + "NodeData not found.", return nullptr); - auto free_service_data = rcpputils::make_scope_exit( - [service_data, allocator]() { - allocator->deallocate(service_data, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - service_data, service_data, return nullptr, - rmw_zenoh_cpp::rmw_service_data_t); - auto destruct_service_data = rcpputils::make_scope_exit( - [service_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - service_data->~rmw_service_data_t(), - rmw_zenoh_cpp::rmw_service_data_t); - }); - // Adapt any 'best available' QoS options - service_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( - nullptr, nullptr, &service_data->adapted_qos_profile, nullptr); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); - return nullptr; - } - - // Get the RMW type support. - const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); - if (type_support == nullptr) { - // error was already set by find_service_type_support + if (!node_data->create_service_data( + rmw_service, + context_impl->session(), + context_impl->get_next_entity_id(), + service_name, + type_support, + qos_profile)) + { + // Error already handled. return nullptr; } - auto service_members = static_cast(type_support->data); - auto request_members = static_cast( - service_members->request_members_->data); - auto response_members = static_cast( - service_members->response_members_->data); - - service_data->context = node->context; - service_data->typesupport_identifier = type_support->typesupport_identifier; - service_data->type_hash = type_support->get_type_hash_func(type_support); - service_data->request_type_support_impl = request_members; - service_data->response_type_support_impl = response_members; - - // Request type support - service_data->request_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::RequestTypeSupport), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - service_data->request_type_support, - "Failed to allocate rmw_zenoh_cpp::RequestTypeSupport", - return nullptr); - auto free_request_type_support = rcpputils::make_scope_exit( - [request_type_support = service_data->request_type_support, allocator]() { - allocator->deallocate(request_type_support, allocator->state); - }); - RMW_TRY_PLACEMENT_NEW( - service_data->request_type_support, - service_data->request_type_support, - return nullptr, - rmw_zenoh_cpp::RequestTypeSupport, service_members); - auto destruct_request_type_support = rcpputils::make_scope_exit( - [service_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - service_data->request_type_support->~RequestTypeSupport(), - rmw_zenoh_cpp::RequestTypeSupport); - }); - - // Response type support - service_data->response_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::ResponseTypeSupport), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - service_data->response_type_support, - "Failed to allocate rmw_zenoh_cpp::ResponseTypeSupport", - return nullptr); - auto free_response_type_support = rcpputils::make_scope_exit( - [response_type_support = service_data->response_type_support, allocator]() { - allocator->deallocate(response_type_support, allocator->state); - }); - RMW_TRY_PLACEMENT_NEW( - service_data->response_type_support, - service_data->response_type_support, - return nullptr, - rmw_zenoh_cpp::ResponseTypeSupport, service_members); - auto destruct_response_type_support = rcpputils::make_scope_exit( - [service_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - service_data->response_type_support->~ResponseTypeSupport(), - rmw_zenoh_cpp::ResponseTypeSupport); - }); - - // Populate the rmw_service. + // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased + // Service handle will be returned in the rmw_service_t in rmw_wait + // from which we cannot obtain ServiceData. + rmw_service->data = static_cast(node_data->get_service_data(rmw_service).get()); rmw_service->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_service->service_name = rcutils_strdup(service_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( @@ -2125,129 +2064,8 @@ rmw_create_service( allocator->deallocate(const_cast(rmw_service->service_name), allocator->state); }); - // Note: Service request/response types will contain a suffix Request_ or Response_. - // We remove the suffix when appending the type to the liveliness tokens for - // better reusability within GraphCache. - std::string service_type = service_data->response_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Response_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for service %s. Report this bug", - service_type.c_str(), rmw_service->service_name); - return nullptr; - } - - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - service_data->type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - z_session_t session = context_impl->session(); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "NodeData not found.", - return nullptr); - service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id()), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Service, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - node->context->actual_domain_id, - rmw_service->service_name, - std::move(service_type), - type_hash_c_str, - service_data->adapted_qos_profile} - ); - if (service_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the service %s.", - rmw_service->service_name); - return nullptr; - } - service_data->keyexpr = z_keyexpr_new(service_data->entity->topic_info()->topic_keyexpr_.c_str()); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [service_data]() { - if (service_data) { - z_drop(z_move(service_data->keyexpr)); - } - }); - if (!z_check(z_loan(service_data->keyexpr))) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - - z_owned_closure_query_t callback = z_closure( - rmw_zenoh_cpp::service_data_handler, nullptr, - service_data); - // Configure the queryable to process complete queries. - z_queryable_options_t qable_options = z_queryable_options_default(); - qable_options.complete = true; - service_data->qable = z_declare_queryable( - session, - z_loan(service_data->keyexpr), - z_move(callback), - &qable_options); - - if (!z_check(service_data->qable)) { - RMW_SET_ERROR_MSG("unable to create zenoh queryable"); - return nullptr; - } - auto undeclare_z_queryable = rcpputils::make_scope_exit( - [service_data]() { - z_undeclare_queryable(z_move(service_data->qable)); - }); - - service_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(service_data->entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [service_data]() { - if (service_data != nullptr) { - z_drop(z_move(service_data->token)); - } - }); - if (!z_check(service_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the service."); - return nullptr; - } - - rmw_service->data = service_data; - free_rmw_service.cancel(); - free_service_data.cancel(); free_service_name.cancel(); - destruct_service_data.cancel(); - destruct_request_type_support.cancel(); - destruct_response_type_support.cancel(); - free_request_type_support.cancel(); - free_response_type_support.cancel(); - free_ros_keyexpr.cancel(); - undeclare_z_queryable.cancel(); - free_token.cancel(); return rmw_service; } @@ -2257,10 +2075,12 @@ rmw_create_service( rmw_ret_t rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -2271,33 +2091,15 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) service->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + auto node_data = context_impl->get_node_data(node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } - rcutils_allocator_t * allocator = &node->context->options.allocator; - - rmw_zenoh_cpp::rmw_service_data_t * service_data = - static_cast(service->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - service_data, - "Unable to retrieve service_data from service", - return RMW_RET_INVALID_ARGUMENT); - - // CLEANUP ================================================================ - z_drop(z_move(service_data->keyexpr)); - z_undeclare_queryable(z_move(service_data->qable)); - zc_liveliness_undeclare_token(z_move(service_data->token)); - - RMW_TRY_DESTRUCTOR( - service_data->request_type_support->~RequestTypeSupport(), rmw_zenoh_cpp::RequestTypeSupport, ); - allocator->deallocate(service_data->request_type_support, allocator->state); - - RMW_TRY_DESTRUCTOR( - service_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, - ); - allocator->deallocate(service_data->response_type_support, allocator->state); - - RMW_TRY_DESTRUCTOR(service_data->~rmw_service_data_t(), rmw_zenoh_cpp::rmw_service_data_t, ); - allocator->deallocate(service->data, allocator->state); + // Remove the ServiceData from NodeData. + node_data->delete_service_data(service); + rcutils_allocator_t * allocator = &node->context->options.allocator; allocator->deallocate(const_cast(service->service_name), allocator->state); allocator->deallocate(service, allocator->state); @@ -2313,95 +2115,26 @@ rmw_take_request( void * ros_request, bool * taken) { + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); *taken = false; - RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); - + RMW_CHECK_ARGUMENT_FOR_NULL(service->service_name, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_ERROR); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( service, service->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + rmw_zenoh_cpp::ServiceData * service_data = + static_cast(service->data); + RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_FOR_NULL_WITH_MSG( - service->service_name, "service has no service name", RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_service_data_t * service_data = - static_cast(service->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - - std::unique_ptr query = service_data->pop_next_query(); - if (query == nullptr) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - const z_query_t loaned_query = query->get_query(); - - // DESERIALIZE MESSAGE ======================================================== - z_value_t payload_value = z_query_value(&loaned_query); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload_value.payload.start)), - payload_value.payload.len); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!service_data->request_type_support->deserialize_ros_message( - deser.get_cdr(), - ros_request, - service_data->request_type_support_impl)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS message"); - return RMW_RET_ERROR; - } - - // Fill in the request header. - - // Get the sequence_number out of the attachment - z_attachment_t attachment = z_query_attachment(&loaned_query); - - request_header->request_id.sequence_number = - rmw_zenoh_cpp::get_int64_from_attachment(&attachment, "sequence_number"); - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } - - request_header->source_timestamp = rmw_zenoh_cpp::get_int64_from_attachment( - &attachment, - "source_timestamp"); - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); - return RMW_RET_ERROR; - } - - if (!rmw_zenoh_cpp::get_gid_from_attachment( - &attachment, - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client GID from attachment"); - return RMW_RET_ERROR; - } - - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - request_header->received_timestamp = now_ns.count(); - - // Add this query to the map, so that rmw_send_response can quickly look it up later - if (!service_data->add_to_query_map(request_header->request_id, std::move(query))) { - RMW_SET_ERROR_MSG("duplicate sequence number in the map"); - return RMW_RET_ERROR; - } - - *taken = true; - - return RMW_RET_OK; + return service_data->take_request( + request_header, + ros_request, + taken); } //============================================================================== @@ -2416,92 +2149,22 @@ rmw_send_response( RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( service, service->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - RMW_CHECK_FOR_NULL_WITH_MSG( service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); + rmw_zenoh_cpp::ServiceData * service_data = + static_cast(service->data); + RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_service_data_t * service_data = - static_cast(service->data); - - // Create the queryable payload - std::unique_ptr query = - service_data->take_from_query_map(*request_header); - if (query == nullptr) { - // If there is no data associated with this request, the higher layers of - // ROS 2 seem to expect that we just silently return with no work. - return RMW_RET_OK; - } - - rcutils_allocator_t * allocator = &(service_data->context->options.allocator); - - size_t max_data_length = ( - service_data->response_type_support->get_estimated_serialized_size( - ros_response, service_data->response_type_support_impl)); - - // Init serialized message byte array - char * response_bytes = static_cast(allocator->allocate( - max_data_length, - allocator->state)); - if (!response_bytes) { - RMW_SET_ERROR_MSG("failed to allocate response message bytes"); - return RMW_RET_ERROR; - } - auto free_response_bytes = rcpputils::make_scope_exit( - [response_bytes, allocator]() { - allocator->deallocate(response_bytes, allocator->state); - }); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(response_bytes, max_data_length); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!service_data->response_type_support->serialize_ros_message( - ros_response, - ser.get_cdr(), - service_data->response_type_support_impl)) - { - return RMW_RET_ERROR; - } - - size_t data_length = ser.get_serialized_data_length(); - - const z_query_t loaned_query = query->get_query(); - z_query_reply_options_t options = z_query_reply_options_default(); - - z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( - request_header->sequence_number, - [request_header](z_owned_bytes_map_t * map, const char * key) - { - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = request_header->writer_guid; - z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - options.attachment = z_bytes_map_as_attachment(&map); - - z_query_reply( - &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( - response_bytes), data_length, &options); - - return RMW_RET_OK; + return service_data->send_response( + request_header, + ros_response); } //============================================================================== @@ -2518,12 +2181,11 @@ rmw_service_request_subscription_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_service_data_t * service_data = - static_cast(service->data); + rmw_zenoh_cpp::ServiceData * service_data = + static_cast(service->data); RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); - *qos = service_data->adapted_qos_profile; + *qos = service_data->topic_info().qos_; return RMW_RET_OK; } @@ -2767,7 +2429,7 @@ check_and_attach_condition( if (services) { for (size_t i = 0; i < services->service_count; ++i) { - auto serv_data = static_cast(services->services[i]); + auto serv_data = static_cast(services->services[i]); if (serv_data == nullptr) { continue; } @@ -2924,7 +2586,7 @@ rmw_wait( if (services) { for (size_t i = 0; i < services->service_count; ++i) { - auto serv_data = static_cast(services->services[i]); + auto serv_data = static_cast(services->services[i]); if (serv_data == nullptr) { continue; } @@ -3269,7 +2931,7 @@ rmw_subscription_set_on_new_message_callback( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - sub_data->set_on_new_message_callback(callback, user_data); + sub_data->set_on_new_message_callback(std::move(callback), user_data); return RMW_RET_OK; } @@ -3282,11 +2944,11 @@ rmw_service_set_on_new_request_callback( const void * user_data) { RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_service_data_t * service_data = - static_cast(service->data); + rmw_zenoh_cpp::ServiceData * service_data = + static_cast(service->data); RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); - service_data->data_callback_mgr.set_callback( - user_data, callback); + service_data->set_on_new_request_callback( + std::move(callback), user_data); return RMW_RET_OK; } @@ -3303,7 +2965,7 @@ rmw_client_set_on_new_response_callback( static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); client_data->data_callback_mgr.set_callback( - user_data, callback); + user_data, std::move(callback)); return RMW_RET_OK; } } // extern "C"