diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 722d5e71..00000000 --- a/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.vscode diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 7d8db9dd..89b5a598 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -11,8 +11,6 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() -set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY ON CACHE BOOL "Compile Zenoh RMW with Shared Memory support") - # find dependencies find_package(ament_cmake REQUIRED) @@ -36,8 +34,8 @@ add_library(rmw_zenoh_cpp SHARED src/detail/logging.cpp src/detail/message_type_support.cpp src/detail/qos.cpp + src/detail/rmw_client_data.cpp src/detail/rmw_context_impl_s.cpp - src/detail/rmw_data_types.cpp src/detail/rmw_publisher_data.cpp src/detail/rmw_node_data.cpp src/detail/rmw_service_data.cpp @@ -82,13 +80,6 @@ target_compile_definitions(rmw_zenoh_cpp RMW_VERSION_PATCH=${rmw_VERSION_PATCH} ) -if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY}) - target_compile_definitions(rmw_zenoh_cpp - PRIVATE - RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - ) -endif() - ament_export_targets(export_rmw_zenoh_cpp) register_rmw_implementation( diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 25e065dc..f55c8434 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -457,6 +457,8 @@ /// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate /// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the /// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + /// + /// ROS setting: disabled by default until fully tested enabled: false, }, auth: { diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index eaa570af..daffd790 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -462,6 +462,8 @@ /// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate /// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the /// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + /// + /// ROS setting: disabled by default until fully tested enabled: false, }, auth: { diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index cbdcfa1b..35019ca6 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -27,7 +27,7 @@ namespace rmw_zenoh_cpp { -attachement_data_t::attachement_data_t( +attachment_data_t::attachment_data_t( const int64_t _sequence_number, const int64_t _source_timestamp, const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]) @@ -37,14 +37,14 @@ attachement_data_t::attachement_data_t( memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE); } -attachement_data_t::attachement_data_t(attachement_data_t && data) +attachment_data_t::attachment_data_t(attachment_data_t && data) { sequence_number = std::move(data.sequence_number); source_timestamp = std::move(data.source_timestamp); memcpy(source_gid, data.source_gid, RMW_GID_STORAGE_SIZE); } -void attachement_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment) +void attachment_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment) { ze_owned_serializer_t serializer; ze_serializer_empty(&serializer); @@ -57,7 +57,7 @@ void attachement_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment) ze_serializer_finish(z_move(serializer), attachment); } -attachement_data_t::attachement_data_t(const z_loaned_bytes_t * attachment) +attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment) { ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment); z_owned_string_t key; diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index 2648b667..7d7af015 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -22,15 +22,15 @@ namespace rmw_zenoh_cpp { -class attachement_data_t final +class attachment_data_t final { public: - explicit attachement_data_t( + explicit attachment_data_t( const int64_t _sequence_number, const int64_t _source_timestamp, const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]); - explicit attachement_data_t(const z_loaned_bytes_t *); - explicit attachement_data_t(attachement_data_t && data); + explicit attachment_data_t(const z_loaned_bytes_t *); + explicit attachment_data_t(attachment_data_t && data); int64_t sequence_number; int64_t source_timestamp; diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 24cbc23e..1e3e5517 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -35,7 +36,6 @@ #include "graph_cache.hpp" #include "logging_macros.hpp" -#include "rmw_data_types.hpp" namespace rmw_zenoh_cpp { @@ -612,10 +612,10 @@ void GraphCache::parse_del( if (entity->type() == EntityType::Node) { // Node - // In case the remote Node closed abruptly or was disconnected, Zenoh could deliver the - // liveliness tokens unregistration events in any order. - // If the event for Node unregistration comes before the unregistration of its - // pubs/subs/services, we should update the count in graph_topics_ and graph_services_. + // When destroying a node, Zenoh does not guarantee that liveliness tokens to remove pub/subs + // arrive before the one to remove the node from the graph despite un-registering those entities + // earlier. In such scenarios, if we find any pub/subs present in the node, we reduce their + // counts in graph_topics_. const GraphNodePtr graph_node = node_it->second; if (!graph_node->pubs_.empty() || !graph_node->subs_.empty() || @@ -1182,15 +1182,15 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( ///============================================================================= rmw_ret_t GraphCache::service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const { *is_available = false; std::lock_guard lock(graph_mutex_); - GraphNode::TopicMap::const_iterator service_it = graph_services_.find(service_name); + GraphNode::TopicMap::const_iterator service_it = graph_services_.find(client_topic_info.name_); if (service_it != graph_services_.end()) { - GraphNode::TopicTypeMap::const_iterator type_it = service_it->second.find(service_type); + GraphNode::TopicTypeMap::const_iterator type_it = + service_it->second.find(client_topic_info.type_); if (type_it != service_it->second.end()) { for (const auto & [_, topic_data] : type_it->second) { if (topic_data->subs_.size() > 0) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 84ad6196..8ac2172d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -23,14 +23,17 @@ #include #include #include +#include #include "event.hpp" #include "liveliness_utils.hpp" #include "ordered_map.hpp" #include "rcutils/allocator.h" +#include "rcutils/types.h" -#include "rmw/topic_endpoint_info_array.h" +#include "rmw/rmw.h" +#include "rmw/get_topic_endpoint_info.h" #include "rmw/names_and_types.h" @@ -169,8 +172,7 @@ class GraphCache final rmw_topic_endpoint_info_array_t * endpoints_info) const; rmw_ret_t service_server_is_available( - const char * service_name, - const char * service_type, + const liveliness::TopicInfo & client_topic_info, bool * is_available) const; /// Set a qos event callback for an entity from the current session. diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 45918d92..52c8ec7f 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -14,8 +14,6 @@ #include "liveliness_utils.hpp" -#include - #include #include #include @@ -23,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +30,8 @@ #include "qos.hpp" #include "simplified_xxhash3.hpp" +#include "rcpputils/scope_exit.hpp" + #include "rmw/error_handling.h" namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index e0c7cf9c..895ae604 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "rmw/types.h" diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp new file mode 100644 index 00000000..ba73bdf3 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -0,0 +1,571 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_client_data.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "attachment_helpers.hpp" +#include "cdr.hpp" +#include "liveliness_utils.hpp" +#include "logging_macros.hpp" +#include "message_type_support.hpp" +#include "qos.hpp" +#include "rmw_context_impl_s.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" + +namespace +{ + +///============================================================================= +void client_data_handler(z_loaned_reply_t * reply, void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t from data in client_data_handler." + ); + return; + } + + if (client_data->is_shutdown()) { + return; + } + + if (!z_reply_is_ok(reply)) { + const z_loaned_reply_err_t * err = z_reply_err(reply); + const z_loaned_bytes_t * err_payload = z_reply_err_payload(err); + + z_owned_string_t err_str; + z_bytes_to_string(err_payload, &err_str); + + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", + client_data->topic_info().topic_keyexpr_.c_str(), + static_cast(z_string_len(z_loan(err_str))), + z_string_data(z_loan(err_str))); + z_drop(z_move(err_str)); + + return; + } + + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + z_owned_reply_t owned_reply; + z_reply_clone(&owned_reply, reply); + client_data->add_new_reply( + std::make_unique(reply, received_timestamp)); +} + +///============================================================================= +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t from data in client_data_drop." + ); + return; + } + + client_data->decrement_in_flight_and_conditionally_remove(); +} + +} // namespace + +namespace rmw_zenoh_cpp +{ +///============================================================================= +std::shared_ptr ClientData::make( + const z_loaned_session_t * session, + const rmw_node_t * const node, + const rmw_client_t * client, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + // Adapt any 'best available' QoS options + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + nullptr, nullptr, &adapted_qos_profile, nullptr); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + auto request_type_support = std::make_shared(service_members); + auto response_type_support = std::make_shared(service_members); + + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = request_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Request_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), service_name.c_str()); + return nullptr; + } + + // Convert the type hash to a string so that it can be included in the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + std::size_t domain_id = node_info.domain_id_; + auto entity = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(service_id), + liveliness::EntityType::Client, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + service_name, + std::move(service_type), + type_hash_c_str, + std::move(adapted_qos_profile)} + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the client %s.", + service_name.c_str()); + return nullptr; + } + + std::shared_ptr client_data = std::shared_ptr( + new ClientData{ + node, + client, + entity, + request_members, + response_members, + request_type_support, + response_type_support + }); + + if (!client_data->init(session)) { + // init() already set the error. + return nullptr; + } + + return client_data; +} + +///============================================================================= +ClientData::ClientData( + const rmw_node_t * rmw_node, + const rmw_client_t * rmw_client, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::shared_ptr request_type_support, + std::shared_ptr response_type_support) +: rmw_node_(rmw_node), + rmw_client_(rmw_client), + entity_(std::move(entity)), + request_type_support_impl_(request_type_support_impl), + response_type_support_impl_(response_type_support_impl), + request_type_support_(request_type_support), + response_type_support_(response_type_support), + wait_set_data_(nullptr), + sequence_number_(1), + is_shutdown_(false), + num_in_flight_(0) +{ + // Do nothing. +} + +///============================================================================= +bool ClientData::init(const z_loaned_session_t * session) +{ + std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_; + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->keyexpr_)); + }); + if (z_keyexpr_from_str(&this->keyexpr_, topic_keyexpr.c_str()) != Z_OK) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return false; + } + + std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr(); + z_view_keyexpr_t liveliness_ke; + z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + auto free_token = rcpputils::make_scope_exit( + [this]() { + z_drop(z_move(this->token_)); + }); + if (zc_liveliness_declare_token( + session, + &this->token_, + z_loan(liveliness_ke), + NULL + ) != Z_OK) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return false; + } + + free_ros_keyexpr.cancel(); + free_token.cancel(); + + return true; +} + +///============================================================================= +liveliness::TopicInfo ClientData::topic_info() const +{ + std::lock_guard lock(mutex_); + return entity_->topic_info().value(); +} + +///============================================================================= +void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +{ + std::lock_guard lock(mutex_); + entity_->copy_gid(out_gid); +} + +///============================================================================= +void ClientData::add_new_reply(std::unique_ptr reply) +{ + std::lock_guard lock(mutex_); + const rmw_qos_profile_t adapted_qos_profile = + entity_->topic_info().value().qos_; + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + reply_queue_.size() >= adapted_qos_profile.depth) + { + // Log warning if message is discarded due to hitting the queue depth + z_view_string_t keystr; + z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Query queue depth of %ld reached, discarding oldest Query " + "for client for %.*s", + adapted_qos_profile.depth, + static_cast(z_string_len(z_loan(keystr))), + z_string_data(z_loan(keystr))); + reply_queue_.pop_front(); + } + reply_queue_.emplace_back(std::move(reply)); + + // Since we added new data, trigger user callback and guard condition if they are available + data_callback_mgr_.trigger_callback(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); + } +} + +///============================================================================= +rmw_ret_t ClientData::take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken) +{ + std::lock_guard lock(mutex_); + *taken = false; + + if (is_shutdown_ || reply_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + std::unique_ptr latest_reply = std::move(reply_queue_.front()); + reply_queue_.pop_front(); + + if (!latest_reply->get_sample().has_value()) { + RMW_SET_ERROR_MSG("invalid reply sample"); + return RMW_RET_ERROR; + } + const z_loaned_sample_t * sample = latest_reply->get_sample().value(); + + // Object that manages the raw buffer + z_owned_slice_t payload; + z_bytes_to_slice(z_sample_payload(sample), &payload); + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))), + z_slice_len(z_loan(payload))); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!response_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_response, + response_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } + + // Fill in the request_header + rmw_zenoh_cpp::attachment_data_t attachment(z_sample_attachment(sample)); + request_header->request_id.sequence_number = attachment.sequence_number; + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); + return RMW_RET_ERROR; + } + request_header->source_timestamp = attachment.source_timestamp; + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE); + request_header->received_timestamp = latest_reply->get_received_timestamp(); + + z_drop(z_move(payload)); + *taken = true; + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t ClientData::send_request( + const void * ros_request, + int64_t * sequence_id) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + rmw_context_impl_s * context_impl = static_cast(rmw_node_->context->impl); + if (context_impl == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } + + size_t max_data_length = ( + request_type_support_->get_estimated_serialized_size( + ros_request, request_type_support_impl_)); + + // Init serialized message byte array + char * request_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!request_bytes) { + RMW_SET_ERROR_MSG("failed allocate request message bytes"); + return RMW_RET_ERROR; + } + auto always_free_request_bytes = rcpputils::make_scope_exit( + [request_bytes, allocator]() { + allocator->deallocate(request_bytes, allocator->state); + }); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); + // Object that serializes the data + Cdr ser(fastbuffer); + if (!request_type_support_->serialize_ros_message( + ros_request, + ser.get_cdr(), + request_type_support_impl_)) + { + return RMW_RET_ERROR; + } + size_t data_length = ser.get_serialized_data_length(); + *sequence_id = sequence_number_++; + + // Send request + z_get_options_t opts; + z_get_options_default(&opts); + z_owned_bytes_t attachment; + uint8_t local_gid[RMW_GID_STORAGE_SIZE]; + entity_->copy_gid(local_gid); + rmw_zenoh_cpp::create_map_and_set_sequence_num( + &attachment, *sequence_id, + local_gid); + opts.attachment = z_move(attachment); + + opts.target = Z_QUERY_TARGET_ALL_COMPLETE; + // The default timeout for a z_get query is 10 seconds and if a response is not received within + // this window, the queryable will return an invalid reply. However, it is common for actions, + // which are implemented using services, to take an extended duration to complete. Hence, we set + // the timeout_ms to the largest supported value to account for most realistic scenarios. + opts.timeout_ms = std::numeric_limits::max(); + // Latest consolidation guarantees unicity of replies for the same key expression, + // which optimizes bandwidth. The default is "None", which imples replies may come in any order + // and any number. + opts.consolidation = z_query_consolidation_latest(); + + z_owned_bytes_t payload; + z_bytes_copy_from_buf( + &payload, reinterpret_cast(request_bytes), data_length); + opts.payload = z_move(payload); + + // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, + // capture shared_from_this() instead of this. + num_in_flight_++; + z_owned_closure_reply_t zn_closure_reply; + z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this); + z_get( + context_impl->session(), + z_loan(keyexpr_), "", + z_move(zn_closure_reply), + &opts); + + return RMW_RET_OK; +} + +///============================================================================= +ClientData::~ClientData() +{ + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing client /%s.", + entity_->topic_info().value().name_.c_str() + ); + } +} + +//============================================================================== +void ClientData::set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data) +{ + std::lock_guard lock(mutex_); + data_callback_mgr_.set_callback(user_data, std::move(callback)); +} + +///============================================================================= +bool ClientData::queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data) +{ + std::lock_guard lock(mutex_); + if (!reply_queue_.empty()) { + return true; + } + wait_set_data_ = wait_set_data; + + return false; +} + +///============================================================================= +bool ClientData::detach_condition_and_queue_is_empty() +{ + std::lock_guard lock(mutex_); + wait_set_data_ = nullptr; + + return reply_queue_.empty(); +} + +///============================================================================= +void ClientData::_shutdown() +{ + if (is_shutdown_) { + return; + } + + // Unregister this node from the ROS graph. + zc_liveliness_undeclare_token(z_move(token_)); + + z_drop(z_move(keyexpr_)); + + is_shutdown_ = true; +} + +///============================================================================= +rmw_ret_t ClientData::shutdown() +{ + std::lock_guard lock(mutex_); + _shutdown(); + return RMW_RET_OK; +} + +///============================================================================= +bool ClientData::shutdown_and_query_in_flight() +{ + std::lock_guard lock(mutex_); + _shutdown(); + return num_in_flight_ > 0; +} + +///============================================================================= +void ClientData::decrement_in_flight_and_conditionally_remove() +{ + std::unique_lock lock(mutex_); + --num_in_flight_; + + if (is_shutdown_ && num_in_flight_ == 0) { + rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); + if (context_impl == nullptr) { + return; + } + std::shared_ptr node_data = context_impl->get_node_data(rmw_node_); + if (node_data == nullptr) { + return; + } + // We have to unlock here since we are about to delete ourself, and thus the unlock would be UB. + lock.unlock(); + node_data->delete_client_data(rmw_client_); + } +} + +///============================================================================= +bool ClientData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp new file mode 100644 index 00000000..b78c1744 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -0,0 +1,156 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_CLIENT_DATA_HPP_ +#define DETAIL__RMW_CLIENT_DATA_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.hpp" +#include "liveliness_utils.hpp" +#include "message_type_support.hpp" +#include "service_type_support.hpp" +#include "type_support_common.hpp" +#include "zenoh_utils.hpp" + +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" +#include "rmw/ret_types.h" + +namespace rmw_zenoh_cpp +{ + +///============================================================================= +class ClientData final : public std::enable_shared_from_this +{ +public: + // Make a shared_ptr of ClientData. + static std::shared_ptr make( + const z_loaned_session_t * session, + const rmw_node_t * const node, + const rmw_client_t * client, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t service_id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Get a copy of the TopicInfo of this ClientData. + liveliness::TopicInfo topic_info() const; + + // Copy the GID of this ClientData into an rmw_gid_t. + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + + // Add a new ZenohReply to the queue. + void add_new_reply(std::unique_ptr reply); + + // Take a ROS service response. + rmw_ret_t take_response( + rmw_service_info_t * request_header, + void * ros_response, + bool * taken); + + // Send a service request. + rmw_ret_t send_request( + const void * ros_request, + int64_t * sequence_id); + + // Set a callback to be called when events happen. + void set_on_new_response_callback( + rmw_event_callback_t callback, + const void * user_data); + + // Check if there is data in the queue, and if not attach the wait set condition variable. + bool queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data); + + // Detach any attached wait set condition variable, and return whether there is data in the queue. + bool detach_condition_and_queue_is_empty(); + + // Shutdown this ClientData. + rmw_ret_t shutdown(); + + // Shutdown this ClientData, and return whether there are any requests currently in flight. + bool shutdown_and_query_in_flight(); + + // Decrement the in flight requests, and if that drops to 0 remove the client from the node. + void decrement_in_flight_and_conditionally_remove(); + + // Check if this ClientData is shutdown. + bool is_shutdown() const; + + // Destructor. + ~ClientData(); + +private: + // Constructor. + ClientData( + const rmw_node_t * rmw_node, + const rmw_client_t * client, + std::shared_ptr entity, + const void * request_type_support_impl, + const void * response_type_support_impl, + std::shared_ptr request_type_support, + std::shared_ptr response_type_support); + + // Initialize the Zenoh objects for this entity. + bool init(const z_loaned_session_t * session); + + // Shutdown this client (the mutex is expected to be held by the caller). + void _shutdown(); + + // Internal mutex. + mutable std::recursive_mutex mutex_; + // The parent node. + const rmw_node_t * rmw_node_; + const rmw_client_t * rmw_client_; + // The Entity generated for the service. + std::shared_ptr entity_; + // An owned keyexpression. + z_owned_keyexpr_t keyexpr_; + // Liveliness token for the service. + zc_owned_liveliness_token_t token_; + // Type support fields. + const void * request_type_support_impl_; + const void * response_type_support_impl_; + std::shared_ptr request_type_support_; + std::shared_ptr response_type_support_; + // Deque to store the replies in the order they arrive. + std::deque> reply_queue_; + // Wait set data. + rmw_wait_set_data_t * wait_set_data_; + // Data callback manager. + DataCallbackManager data_callback_mgr_; + // Sequence number for queries. + size_t sequence_number_; + // Shutdown flag. + bool is_shutdown_; + size_t num_in_flight_; +}; +using ClientDataPtr = std::shared_ptr; +using ClientDataConstPtr = std::shared_ptr; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_CLIENT_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 4b77e031..7915cd9e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -14,15 +14,23 @@ #include "rmw_context_impl_s.hpp" +#include +#include +#include +#include #include #include +#include #include #include +#include #include +#include "graph_cache.hpp" +#include "guard_condition.hpp" #include "identifier.hpp" -#include "liveliness_utils.hpp" #include "logging_macros.hpp" +#include "rmw_node_data.hpp" #include "zenoh_config.hpp" #include "zenoh_router_check.hpp" @@ -33,178 +41,410 @@ // TODO(clalancette): Make this configurable, or get it from the configuration #define SHM_BUFFER_SIZE_MB 10 -// The variable is used to identify whether the process is trying to exit or not. -// The atexit function we registered will set the flag and prevent us from closing -// Zenoh Session. Zenoh API can't be used in atexit function, because Tokio context -// is already destroyed. It will cause panic if we do so. -static bool is_exiting = false; -void update_is_exiting() -{ - is_exiting = true; -} +// This global mapping of raw Data pointers to Data shared pointers allows graph_sub_data_handler() +// to lookup the pointer, and gain a reference to a shared_ptr if it exists. +// This guarantees that the Data object will not be destroyed while we are using it. +static std::mutex data_to_data_shared_ptr_map_mutex; +static std::unordered_map> data_to_data_shared_ptr_map; -///============================================================================= -void rmw_context_impl_s::graph_sub_data_handler(z_loaned_sample_t * sample, void * data) +static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data); + +// Bundle all class members into a data struct which can be passed as a +// weak ptr to various threads for thread-safe access without capturing +// "this" ptr by reference. +class rmw_context_impl_s::Data final { - z_view_string_t keystr; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); +public: + // Constructor. + Data( + std::size_t domain_id, + const std::string & enclave) + : domain_id_(std::move(domain_id)), + enclave_(std::move(enclave)), + is_shutdown_(false), + next_entity_id_(0), + nodes_({}) + { + // Initialize the zenoh configuration. + z_owned_config_t config; + rmw_ret_t ret; + if ((ret = + rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Session, + &config)) != RMW_RET_OK) + { + throw std::runtime_error("Error configuring Zenoh session."); + } - auto data_ptr = static_cast(data); - if (data_ptr == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Invalid data_ptr." - ); - return; + // Check if shm is enabled. + z_owned_string_t shm_enabled; + zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled); + auto always_free_shm_enabled = rcpputils::make_scope_exit( + [&shm_enabled]() { + z_drop(z_move(shm_enabled)); + }); + + // Initialize the zenoh session. + if (z_open(&session_, z_move(config), NULL) != Z_OK) { + RMW_SET_ERROR_MSG("Error setting up zenoh session."); + throw std::runtime_error("Error setting up zenoh session."); + } + auto close_session = rcpputils::make_scope_exit( + [this]() { + z_close(z_loan_mut(session_), NULL); + }); + + // Verify if the zenoh router is running if configured. + const std::optional configured_connection_attempts = + rmw_zenoh_cpp::zenoh_router_check_attempts(); + if (configured_connection_attempts.has_value()) { + uint64_t connection_attempts = 0; + constexpr std::chrono::milliseconds sleep_time(1000); + constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time); + while ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) { + if ((connection_attempts % ticks_between_print) == 0) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Unable to connect to a Zenoh router. " + "Have you started a router with `ros2 run rmw_zenoh_cpp rmw_zenohd`?"); + } + if (++connection_attempts >= configured_connection_attempts.value()) { + break; + } + std::this_thread::sleep_for(sleep_time); + } + } + + // Initialize the graph cache. + const z_id_t zid = z_info_zid(z_loan(session_)); + graph_cache_ = std::make_shared(zid); + // Setup liveliness subscriptions for discovery. + std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); + + // Query router/liveliness participants to get graph information before the session was started. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_fifo_handler_reply_t handler; + z_owned_closure_reply_t closure; + z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1); + + z_view_keyexpr_t keyexpr; + z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); + zc_liveliness_get( + z_loan(session_), z_loan(keyexpr), + z_move(closure), NULL); + z_owned_reply_t reply; + while (z_recv(z_loan(handler), &reply) == Z_OK) { + if (z_reply_is_ok(z_loan(reply))) { + const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply)); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); + std::string livelines_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + graph_cache_->parse_put(std::move(livelines_str), true); + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); + } + z_drop(z_move(reply)); + } + z_drop(z_move(handler)); + + // Initialize the shm manager if shared_memory is enabled in the config. + shm_provider_ = std::nullopt; + if (strncmp( + z_string_data(z_loan(shm_enabled)), + "true", + z_string_len(z_loan(shm_enabled))) == 0) + { + // TODO(yuyuan): determine the default alignment of SHM + z_alloc_alignment_t alignment = {5}; + z_owned_memory_layout_t layout; + z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment); + + z_owned_shm_provider_t provider; + if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create an SHM provider."); + throw std::runtime_error("Unable to create an SHM provider."); + } + shm_provider_ = provider; + } + auto free_shm_provider = rcpputils::make_scope_exit( + [this]() { + if (shm_provider_.has_value()) { + z_drop(z_move(shm_provider_.value())); + } + }); + + graph_guard_condition_ = std::make_unique(); + graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + graph_guard_condition_->data = &guard_condition_data_; + + // Setup the liveliness subscriber to receives updates from the ROS graph + // and update the graph cache. + zc_liveliness_subscriber_options_t sub_options; + zc_liveliness_subscriber_options_default(&sub_options); + z_owned_closure_sample_t callback; + z_closure(&callback, graph_sub_data_handler, nullptr, this); + z_view_keyexpr_t liveliness_ke; + z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str()); + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_undeclare_subscriber(z_move(this->graph_subscriber_)); + }); + if (zc_liveliness_declare_subscriber( + z_loan(session_), + &graph_subscriber_, z_loan(liveliness_ke), + z_move(callback), &sub_options) != Z_OK) + { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + throw std::runtime_error("Unable to subscribe to ROS graph updates."); + } + + close_session.cancel(); + free_shm_provider.cancel(); + undeclare_z_sub.cancel(); } - // Update the graph cache. - std::lock_guard lock(data_ptr->mutex_); - if (data_ptr->is_shutdown_) { - return; + // Shutdown the Zenoh session. + rmw_ret_t shutdown() + { + { + std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; + if (is_shutdown_) { + return ret; + } + + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_provider_.has_value()) { + z_drop(z_move(shm_provider_.value())); + } + is_shutdown_ = true; + + // We specifically do *not* hold the mutex_ while tearing down the session; this allows us + // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). + } + + // Close the zenoh session + if (z_close(z_loan_mut(session_), NULL) != Z_OK) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } + return RMW_RET_OK; } - std::string str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); - switch (z_sample_kind(sample)) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - data_ptr->graph_cache_->parse_put(str); - break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - data_ptr->graph_cache_->parse_del(str); - break; - default: - return; + + std::string enclave() const + { + std::lock_guard lock(mutex_); + return enclave_; } - // Trigger the ROS graph guard condition. - rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_.get()); - if (RMW_RET_OK != rmw_ret) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition." - ); + const z_loaned_session_t * session() const + { + std::lock_guard lock(mutex_); + return z_loan(session_); } -} -///============================================================================= -rmw_context_impl_s::Data::Data( - std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - const std::string & liveliness_str, - std::shared_ptr graph_cache -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , std::optional shm -#endif -) -: enclave_(std::move(enclave)), - domain_id_(std::move(domain_id)), - session_(std::move(session)), - liveliness_str_(std::move(liveliness_str)), - graph_cache_(std::move(graph_cache)), - is_shutdown_(false), - next_entity_id_(0), - is_initialized_(false), - nodes_({}) -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , shm_(shm) -#endif -{ - graph_guard_condition_ = std::make_unique(); - graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - graph_guard_condition_->data = &guard_condition_data_; -} + std::optional & shm_provider() + { + std::lock_guard lock(mutex_); + return shm_provider_; + } -///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() -{ - std::lock_guard lock(mutex_); - if (is_initialized_) { - return RMW_RET_OK; + rmw_guard_condition_t * graph_guard_condition() + { + std::lock_guard lock(mutex_); + return graph_guard_condition_.get(); } - // Setup the liveliness subscriber to receives updates from the ROS graph - // and update the graph cache. - // TODO(Yadunund): This closure is still not 100% thread safe as we are - // passing Data* as the type erased argument to z_closure. Thus during - // the execution of graph_sub_data_handler, the rawptr may be freed/reset - // by a different thread. When we switch to zenoh-cpp we can replace z_closure - // with a lambda that captures a weak_ptr by copy. The lambda and caputed - // weak_ptr will have the same lifetime as the subscriber. Then within - // graph_sub_data_handler, we would first lock to weak_ptr to check if the - // shared_ptr exits. If it does, then even if a different thread calls - // rmw_context_fini() to destroy rmw_context_impl_s, the locked - // shared_ptr would live on until the graph_sub_data_handler callback. - zc_liveliness_subscriber_options_t sub_options; - zc_liveliness_subscriber_options_default(&sub_options); - // Enable history option to retrieve the old graph info in case there is - // something updated after getting the graph info. - sub_options.history = true; - z_owned_closure_sample_t callback; - z_closure(&callback, graph_sub_data_handler, nullptr, this); - z_view_keyexpr_t keyexpr; - z_view_keyexpr_from_str(&keyexpr, liveliness_str_.c_str()); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); - if (zc_liveliness_declare_subscriber( - z_loan(session_), - &graph_subscriber_, z_loan(keyexpr), - z_move(callback), &sub_options) != Z_OK) + + std::size_t get_next_entity_id() { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return RMW_RET_ERROR; + std::lock_guard lock(mutex_); + return next_entity_id_++; } - undeclare_z_sub.cancel(); - is_initialized_ = true; - return RMW_RET_OK; -} + bool is_shutdown() const + { + std::lock_guard lock(mutex_); + return is_shutdown_; + } -///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::shutdown() -{ - std::lock_guard lock(mutex_); - rmw_ret_t ret = RMW_RET_OK; - if (is_shutdown_) { - return ret; + bool session_is_valid() const + { + std::lock_guard lock(mutex_); + return !z_session_is_closed(z_loan(session_)); + } + + std::shared_ptr graph_cache() + { + std::lock_guard lock(mutex_); + return graph_cache_; } - // Shutdown all the nodes in this context. - for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { - ret = node_it->second->shutdown(); - if (ret != RMW_RET_OK) { + bool create_node_data( + const rmw_node_t * const node, + const std::string & ns, + const std::string & node_name) + { + std::lock_guard lock(mutex_); + if (nodes_.count(node) > 0) { + // Node already exists. + return false; + } + + // Check that the Zenoh session is still valid. + if (!session_is_valid()) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", - node_it->second->id(), - ret - ); + "Unable to create NodeData as Zenoh session is invalid."); + return false; } + + auto node_data = rmw_zenoh_cpp::NodeData::make( + node, + this->get_next_entity_id(), + z_loan(session_), + domain_id_, + ns, + node_name, + enclave_); + if (node_data == nullptr) { + // Error already handled. + return false; + } + + auto node_insertion = nodes_.insert(std::make_pair(node, std::move(node_data))); + if (!node_insertion.second) { + return false; + } + + return true; } - z_undeclare_subscriber(z_move(graph_subscriber_)); + std::shared_ptr get_node_data(const rmw_node_t * const node) + { + std::lock_guard lock(mutex_); + auto node_it = nodes_.find(node); + if (node_it == nodes_.end()) { + return nullptr; + } + return node_it->second; + } - // Don't touch Zenoh Session if the ROS process is exiting, - // it will cause panic. - if (!is_exiting) { - z_close(z_loan_mut(session_), NULL); + void delete_node_data(const rmw_node_t * const node) + { + std::lock_guard lock(mutex_); + nodes_.erase(node); } -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - // drop SHM subsystem if used - shm_ = std::nullopt; -#endif + void update_graph_cache(z_sample_kind_t sample_kind, const std::string & keystr) + { + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return; + } + switch (sample_kind) { + case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + graph_cache_->parse_put(keystr); + break; + case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + graph_cache_->parse_del(keystr); + break; + default: + return; + } - is_shutdown_ = true; - return RMW_RET_OK; -} + // Trigger the ROS graph guard condition. + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(graph_guard_condition_.get()); + if (RMW_RET_OK != rmw_ret) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to trigger graph guard condition." + ); + } + } + + // Destructor. + ~Data() + { + auto ret = this->shutdown(); + nodes_.clear(); + static_cast(ret); + } + +private: + // Mutex to lock when accessing members. + mutable std::recursive_mutex mutex_; + // The ROS domain id of this context. + std::size_t domain_id_; + // Enclave, name used to find security artifacts in a sros2 keystore. + std::string enclave_; + // An owned session. + z_owned_session_t session_; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_provider_; + // Graph cache. + std::shared_ptr graph_cache_; + // ROS graph liveliness subscriber. + z_owned_subscriber_t graph_subscriber_; + // Equivalent to rmw_dds_common::Context's guard condition. + // Guard condition that should be triggered when the graph changes. + std::unique_ptr graph_guard_condition_; + // The GuardCondition data structure. + rmw_zenoh_cpp::GuardCondition guard_condition_data_; + // Shutdown flag. + bool is_shutdown_; + // A counter to assign a local id for every entity created in this session. + std::size_t next_entity_id_; + // Nodes created from this context. + std::unordered_map> nodes_; +}; ///============================================================================= -rmw_context_impl_s::Data::~Data() +static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data) { - auto ret = this->shutdown(); - nodes_.clear(); - static_cast(ret); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); + + auto data_ptr = static_cast(data); + if (data_ptr == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Invalid data_ptr." + ); + return; + } + + // Look up the data shared_ptr in the global map. If it is in there, use it. + // If not, it is being shutdown so we can just ignore this update. + std::shared_ptr data_shared_ptr{nullptr}; + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + if (data_to_data_shared_ptr_map.count(data_ptr) == 0) { + return; + } + data_shared_ptr = data_to_data_shared_ptr_map[data_ptr]; + } + + // Update the graph cache. + std::string livelines_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); + data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(livelines_str)); } ///============================================================================= @@ -212,229 +452,75 @@ rmw_context_impl_s::rmw_context_impl_s( const std::size_t domain_id, const std::string & enclave) { - // Initialize the zenoh configuration. - z_owned_config_t config; - rmw_ret_t ret; - if ((ret = - rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Session, - &config)) != RMW_RET_OK) - { - throw std::runtime_error("Error configuring Zenoh session."); - } - - // Initialize the zenoh session. - z_owned_session_t session; - if (z_open(&session, z_move(config), NULL) != Z_OK) { - RMW_SET_ERROR_MSG("Error setting up zenoh session"); - throw std::runtime_error("Error setting up zenoh session."); - } - // This atexit function is registered after ROS initialization, - // so it should be called before ROS finialization - // Check https://en.cppreference.com/w/cpp/utility/program/exit - atexit(update_is_exiting); - auto close_session = rcpputils::make_scope_exit( - [&session]() { - z_close(z_loan_mut(session), NULL); - }); - - // TODO(Yadunund) Move this check into a separate thread. - // Verify if the zenoh router is running if configured. - const std::optional configured_connection_attempts = - rmw_zenoh_cpp::zenoh_router_check_attempts(); - if (configured_connection_attempts.has_value()) { - ret = RMW_RET_ERROR; - uint64_t connection_attempts = 0; - // Retry until the connection is successful. - while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { - ++connection_attempts; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - if (ret != RMW_RET_OK) { - throw std::runtime_error( - "Unable to connect to a Zenoh router after " + - std::to_string(configured_connection_attempts.value()) + - " retries."); - } - } + data_ = std::make_shared(domain_id, std::move(enclave)); - // Initialize the graph cache. - const z_id_t zid = z_info_zid(z_loan(session)); - auto graph_cache = std::make_shared(zid); - // Setup liveliness subscriptions for discovery. - std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( - domain_id); - - // Query router/liveliness participants to get graph information before this session was started. - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_fifo_handler_reply_t handler; - z_owned_closure_reply_t closure; - z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1); - - z_view_keyexpr_t keyexpr; - z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); - zc_liveliness_get( - z_loan(session), z_loan(keyexpr), - z_move(closure), NULL); - - z_owned_reply_t reply; - while (z_recv(z_loan(handler), &reply) == Z_OK) { - if (z_reply_is_ok(z_loan(reply))) { - const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply)); - z_view_string_t key_str; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str); - std::string str(z_string_data(z_loan(key_str)), z_string_len(z_loan(key_str))); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - graph_cache->parse_put(str, true); - } else { - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); - } - z_drop(z_move(reply)); - } - z_drop(z_move(handler)); - -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - // Initialize the shm subsystem if shared_memory is enabled in the config - std::optional shm; - if (rmw_zenoh_cpp::zenoh_shm_enabled()) { - RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled"); - - rmw_zenoh_cpp::ShmContext shm_context; - - // Read msg size treshold from config - shm_context.msgsize_threshold = rmw_zenoh_cpp::zenoh_shm_message_size_threshold(); - - // Create Layout for provider's memory - // Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations - // TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment - z_alloc_alignment_t alignment = {0}; - z_owned_memory_layout_t layout; - if (z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment) != Z_OK) { - throw std::runtime_error("Unable to create a Layout for SHM provider."); - } - // Create SHM provider - const auto provider_creation_result = - z_posix_shm_provider_new(&shm_context.shm_provider, z_loan(layout)); - z_drop(z_move(layout)); - if (provider_creation_result != Z_OK) { - throw std::runtime_error("Unable to create an SHM provider."); - } - // Upon successful provider creation, store it in the context - shm = std::make_optional(std::move(shm_context)); - } else { - RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled"); - } -#endif - - close_session.cancel(); - - data_ = std::make_shared( - domain_id, - std::move(enclave), - std::move(session), - std::move(liveliness_str), - std::move(graph_cache) -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , std::move(shm) -#endif - ); - - ret = data_->subscribe_to_ros_graph(); - if (ret != RMW_RET_OK) { - throw std::runtime_error("Unable to subscribe to ROS Graph updates."); - } + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + data_to_data_shared_ptr_map.emplace(data_.get(), data_); } ///============================================================================= rmw_context_impl_s::~rmw_context_impl_s() { - // Don't touch Zenoh Session if the ROS process is exiting, - // it will cause panic. - if (!is_exiting) { - std::lock_guard lock(data_->mutex_); - z_drop(z_move(data_->session_)); - } + this->shutdown(); } ///============================================================================= std::string rmw_context_impl_s::enclave() const { - std::lock_guard lock(data_->mutex_); - return data_->enclave_; + return data_->enclave(); } ///============================================================================= const z_loaned_session_t * rmw_context_impl_s::session() const { - std::lock_guard lock(data_->mutex_); - return z_loan(data_->session_); + return data_->session(); } ///============================================================================= -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY -std::optional & rmw_context_impl_s::shm() +std::optional & rmw_context_impl_s::shm_provider() { - std::lock_guard lock(data_->mutex_); - return data_->shm_; + return data_->shm_provider(); } -#endif ///============================================================================= rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() { - std::lock_guard lock(data_->mutex_); - return data_->graph_guard_condition_.get(); + return data_->graph_guard_condition(); } ///============================================================================= std::size_t rmw_context_impl_s::get_next_entity_id() { - std::lock_guard lock(data_->mutex_); - return data_->next_entity_id_++; + return data_->get_next_entity_id(); } ///============================================================================= rmw_ret_t rmw_context_impl_s::shutdown() { + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + data_to_data_shared_ptr_map.erase(data_.get()); + } + return data_->shutdown(); } ///============================================================================= bool rmw_context_impl_s::is_shutdown() const { - std::lock_guard lock(data_->mutex_); - return data_->is_shutdown_; + return data_->is_shutdown(); } ///============================================================================= bool rmw_context_impl_s::session_is_valid() const { - std::lock_guard lock(data_->mutex_); - return z_session_is_closed(z_loan(data_->session_)); + return data_->session_is_valid(); } ///============================================================================= std::shared_ptr rmw_context_impl_s::graph_cache() { - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_; + return data_->graph_cache(); } ///============================================================================= @@ -443,48 +529,18 @@ bool rmw_context_impl_s::create_node_data( const std::string & ns, const std::string & node_name) { - std::lock_guard lock(data_->mutex_); - if (data_->nodes_.count(node) > 0) { - // Node already exists. - return false; - } - - auto node_data = rmw_zenoh_cpp::NodeData::make( - node, - this->get_next_entity_id(), - z_loan(data_->session_), - data_->domain_id_, - ns, - node_name, - data_->enclave_); - if (node_data == nullptr) { - // Error already handled. - return false; - } - - auto node_insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data))); - if (!node_insertion.second) { - return false; - } - - return true; + return data_->create_node_data(node, ns, node_name); } ///============================================================================= std::shared_ptr rmw_context_impl_s::get_node_data( const rmw_node_t * const node) { - std::lock_guard lock(data_->mutex_); - auto node_it = data_->nodes_.find(node); - if (node_it == data_->nodes_.end()) { - return nullptr; - } - return node_it->second; + return data_->get_node_data(node); } ///============================================================================= void rmw_context_impl_s::delete_node_data(const rmw_node_t * const node) { - std::lock_guard lock(data_->mutex_); - data_->nodes_.erase(node); + data_->delete_node_data(node); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index c99f5018..1e46d6af 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -17,17 +17,17 @@ #include -# include +#include #include -#include #include #include -#include #include "graph_cache.hpp" -#include "guard_condition.hpp" #include "rmw_node_data.hpp" +#include "rmw/ret_types.h" +#include "rmw/types.h" + ///============================================================================= class rmw_context_impl_s final { @@ -42,21 +42,19 @@ class rmw_context_impl_s final const std::size_t domain_id, const std::string & enclave); + ~rmw_context_impl_s(); + // Get a copy of the enclave. std::string enclave() const; // Loan the Zenoh session. - // TODO(Yadunund): Remove this API once rmw_context_impl_s is updated to - // create other Zenoh objects. const z_loaned_session_t * session() const; -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - // Get a reference to the shm subsystem. + // Get a reference to the shm_provider. // Note: This is not thread-safe. // TODO(Yadunund): Remove this API and instead include a publish() API // that handles the shm_provider once the context manages publishers. - std::optional & shm(); -#endif + std::optional & shm_provider(); // Get the graph guard condition. rmw_guard_condition_t * graph_guard_condition(); @@ -91,75 +89,11 @@ class rmw_context_impl_s final /// Delete the NodeData for a given rmw_node_t if present. void delete_node_data(const rmw_node_t * const node); - // Destructor. - ~rmw_context_impl_s(); + // Forward declaration + class Data; private: - // Bundle all class members into a data struct which can be passed as a - // weak ptr to various threads for thread-safe access without capturing - // "this" ptr by reference. - struct Data : public std::enable_shared_from_this - { - // Constructor. - Data( - std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - const std::string & liveliness_str, - std::shared_ptr graph_cache -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , std::optional shm -#endif - ); - - // Subscribe to the ROS graph. - rmw_ret_t subscribe_to_ros_graph(); - - // Shutdown the Zenoh session. - rmw_ret_t shutdown(); - - // Destructor. - ~Data(); - - // Mutex to lock when accessing members. - mutable std::recursive_mutex mutex_; - // RMW allocator. - const rcutils_allocator_t * allocator_; - // Enclave, name used to find security artifacts in a sros2 keystore. - std::string enclave_; - // The ROS domain id of this context. - std::size_t domain_id_; - // An owned session. - z_owned_session_t session_; - // Liveliness keyexpr string to subscribe to for ROS graph changes. - std::string liveliness_str_; - // Graph cache. - std::shared_ptr graph_cache_; - // ROS graph liveliness subscriber. - z_owned_subscriber_t graph_subscriber_; - // Equivalent to rmw_dds_common::Context's guard condition. - // Guard condition that should be triggered when the graph changes. - std::unique_ptr graph_guard_condition_; - // The GuardCondition data structure. - rmw_zenoh_cpp::GuardCondition guard_condition_data_; - // Shutdown flag. - bool is_shutdown_; - // A counter to assign a local id for every entity created in this session. - std::size_t next_entity_id_; - // True once graph subscriber is initialized. - bool is_initialized_; - // Nodes created from this context. - std::unordered_map> nodes_; -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - // An optional SHM context that is initialized if SHM is enabled in the - // zenoh session config. - std::optional shm_; -#endif - }; - std::shared_ptr data_{nullptr}; - - static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data); }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp deleted file mode 100644 index 206c84b8..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include - -#include "logging_macros.hpp" -#include "rmw_data_types.hpp" - -#include "rmw/impl/cpp/macros.hpp" - - -///============================================================================= -namespace rmw_zenoh_cpp -{ -///============================================================================= -void rmw_client_data_t::notify() -{ - std::lock_guard lock(condition_mutex_); - if (wait_set_data_ != nullptr) { - std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); - wait_set_data_->triggered = true; - wait_set_data_->condition_variable.notify_one(); - } -} - -///============================================================================= -void rmw_client_data_t::add_new_reply(std::unique_ptr reply) -{ - std::lock_guard lock(reply_queue_mutex_); - if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && - reply_queue_.size() >= adapted_qos_profile.depth) - { - // Log warning if message is discarded due to hitting the queue depth - z_view_string_t keystr; - z_keyexpr_as_view_string(z_loan(this->keyexpr), &keystr); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Reply queue depth of %ld reached, discarding oldest reply " - "for client for %s", - adapted_qos_profile.depth, - z_string_data(z_loan(keystr))); - reply_queue_.pop_front(); - } - reply_queue_.emplace_back(std::move(reply)); - - // Since we added new data, trigger user callback and guard condition if they - // are available - data_callback_mgr.trigger_callback(); - notify(); -} - -///============================================================================= -bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) -{ - std::lock_guard lock(condition_mutex_); - if (!reply_queue_.empty()) { - return true; - } - wait_set_data_ = wait_set_data; - - return false; -} - -///============================================================================= -bool rmw_client_data_t::detach_condition_and_queue_is_empty() -{ - std::lock_guard lock(condition_mutex_); - wait_set_data_ = nullptr; - - return reply_queue_.empty(); -} - -///============================================================================= -std::unique_ptr rmw_client_data_t::pop_next_reply() -{ - std::lock_guard lock(reply_queue_mutex_); - - if (reply_queue_.empty()) { - return nullptr; - } - - std::unique_ptr latest_reply = std::move(reply_queue_.front()); - reply_queue_.pop_front(); - - return latest_reply; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -void rmw_client_data_t::increment_in_flight_callbacks() -{ - std::lock_guard lock(in_flight_mutex_); - num_in_flight_++; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class -// for the use of this method. -bool rmw_client_data_t::shutdown_and_query_in_flight() -{ - std::lock_guard lock(in_flight_mutex_); - is_shutdown_ = true; - - return num_in_flight_ > 0; -} - -//============================================================================== -// See the comment about the "num_in_flight" class variable in the -// rmw_client_data_t structure for the use of this method. -bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) -{ - std::lock_guard lock(in_flight_mutex_); - queries_in_flight = --num_in_flight_ > 0; - return is_shutdown_; -} - -bool rmw_client_data_t::is_shutdown() const -{ - std::lock_guard lock(in_flight_mutex_); - return is_shutdown_; -} - - -///============================================================================= -size_t rmw_client_data_t::get_next_sequence_number() -{ - std::lock_guard lock(sequence_number_mutex_); - return sequence_number_++; -} - -//============================================================================== -void client_data_handler(z_loaned_reply_t * reply, void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (client_data->is_shutdown()) { - return; - } - - if (z_reply_is_ok(reply)) { - z_owned_reply_t owned_reply; - z_reply_clone(&owned_reply, reply); - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); - client_data->add_new_reply(std::make_unique(owned_reply, received_timestamp)); - } else { - z_view_string_t keystr; - z_keyexpr_as_view_string(z_loan(client_data->keyexpr), &keystr); - const z_loaned_reply_err_t * err = z_reply_err(reply); - const z_loaned_bytes_t * err_payload = z_reply_err_payload(err); - - z_owned_string_t err_str; - z_bytes_to_string(err_payload, &err_str); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - z_string_data(z_loan(keystr)), static_cast(z_string_len(z_loan(err_str))), - z_string_data(z_loan(err_str))); - z_drop(z_move(err_str)); - return; - } -} - -void client_data_drop(void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t " - ); - return; - } - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - bool queries_in_flight = false; - bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); - - if (is_shutdown) { - if (!queries_in_flight) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - client_data->context->options.allocator.deallocate( - client_data, client_data->context->options.allocator.state); - } - } -} - -} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp deleted file mode 100644 index 796fd51e..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__RMW_DATA_TYPES_HPP_ -#define DETAIL__RMW_DATA_TYPES_HPP_ - -#include - -#include -#include -#include - -#include "rosidl_runtime_c/type_hash.h" - -#include "event.hpp" -#include "liveliness_utils.hpp" -#include "rmw_wait_set_data.hpp" -#include "service_type_support.hpp" -#include "zenoh_utils.hpp" - -/// Structs for various type erased data fields. -namespace rmw_zenoh_cpp -{ -///============================================================================= -void client_data_handler(z_loaned_reply_t * reply, void * client_data); -void client_data_drop(void * data); - -///============================================================================= -class rmw_client_data_t final -{ -public: - // The Entity generated for the client. - std::shared_ptr entity; - - z_owned_keyexpr_t keyexpr; - - // Store the actual QoS profile used to configure this client. - // The QoS is reused for sending requests and getting responses. - rmw_qos_profile_t adapted_qos_profile; - - // Liveliness token for the client. - zc_owned_liveliness_token_t token; - - const void * request_type_support_impl; - const void * response_type_support_impl; - const char * typesupport_identifier; - const rosidl_type_hash_t * type_hash; - RequestTypeSupport * request_type_support; - ResponseTypeSupport * response_type_support; - - rmw_context_t * context; - - size_t get_next_sequence_number(); - - void add_new_reply(std::unique_ptr reply); - - bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - - bool detach_condition_and_queue_is_empty(); - - std::unique_ptr pop_next_reply(); - - DataCallbackManager data_callback_mgr; - - // See the comment for "num_in_flight" below on the use of this method. - void increment_in_flight_callbacks(); - - // See the comment for "num_in_flight" below on the use of this method. - bool shutdown_and_query_in_flight(); - - // See the comment for "num_in_flight" below on the use of this method. - bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); - - bool is_shutdown() const; - -private: - void notify(); - - size_t sequence_number_{1}; - std::mutex sequence_number_mutex_; - - rmw_wait_set_data_t * wait_set_data_{nullptr}; - std::mutex condition_mutex_; - - // TODO(yuyuan): replace with zenoh-c ring buffer handler - std::deque> reply_queue_; - mutable std::mutex reply_queue_mutex_; - - // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no - // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an - // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the - // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. - // - // The next 3 variables are used to avoid that situation. Any time a query is initiated via - // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the - // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ - // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. - // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. - // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query - // callback, the query callback will free up the structure. - // - // There is one case which is not handled by this, which has to do with timeouts. The query - // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never - // returns, the memory in this structure will never be freed. There isn't much we can do about - // that at this time, but we may want to consider changing the timeout so that the memory can - // eventually be freed up. - mutable std::mutex in_flight_mutex_; - bool is_shutdown_{false}; - size_t num_in_flight_{0}; -}; -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index f4eb0518..96d95191 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -114,7 +114,7 @@ NodeData::~NodeData() ///============================================================================= std::size_t NodeData::id() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return id_; } @@ -127,7 +127,7 @@ bool NodeData::create_pub_data( const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -168,7 +168,7 @@ bool NodeData::create_pub_data( ///============================================================================= PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = pubs_.find(publisher); if (it == pubs_.end()) { return nullptr; @@ -180,7 +180,7 @@ PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); pubs_.erase(publisher); } @@ -194,7 +194,7 @@ bool NodeData::create_sub_data( const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -236,7 +236,7 @@ bool NodeData::create_sub_data( ///============================================================================= SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subscription) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = subs_.find(subscription); if (it == subs_.end()) { return nullptr; @@ -248,7 +248,7 @@ SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subs ///============================================================================= void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); subs_.erase(subscription); } @@ -261,7 +261,7 @@ bool NodeData::create_service_data( const rosidl_service_type_support_t * type_supports, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -302,7 +302,7 @@ bool NodeData::create_service_data( ///============================================================================= ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = services_.find(service); if (it == services_.end()) { return nullptr; @@ -314,55 +314,91 @@ ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service) ///============================================================================= void NodeData::delete_service_data(const rmw_service_t * const service) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); services_.erase(service); } + ///============================================================================= -rmw_ret_t NodeData::shutdown() +bool NodeData::create_client_data( + const rmw_client_t * const client, + const z_loaned_session_t * session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_supports, + const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock(mutex_); - rmw_ret_t ret = RMW_RET_OK; + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { - return ret; + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create ClientData as the NodeData has been shutdown."); + return false; } - // Shutdown all the entities within this node. - for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) { - ret = pub_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown publisher %s within id %zu. rmw_ret_t code: %zu.", - pub_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } + if (clients_.count(client) > 0) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "ClientData already exists."); + return false; } - for (auto sub_it = subs_.begin(); sub_it != subs_.end(); ++sub_it) { - ret = sub_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown subscription %s within id %zu. rmw_ret_t code: %zu.", - sub_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } + + auto client_data = ClientData::make( + std::move(session), + node_, + client, + entity_->node_info(), + id_, + std::move(id), + std::move(service_name), + type_supports, + qos_profile); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make ClientData."); + return false; } - for (auto srv_it = services_.begin(); srv_it != services_.end(); ++srv_it) { - ret = srv_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown service %s within id %zu. rmw_ret_t code: %zu.", - srv_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } + + auto insertion = clients_.insert(std::make_pair(client, std::move(client_data))); + if (!insertion.second) { + return false; + } + return true; +} + +///============================================================================= +ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + auto it = clients_.find(client); + if (it == clients_.end()) { + return nullptr; + } + + return it->second; +} + +///============================================================================= +void NodeData::delete_client_data(const rmw_client_t * const client) +{ + std::lock_guard lock_guard(mutex_); + auto client_it = clients_.find(client); + if (client_it == clients_.end()) { + return; + } + if (!client_it->second->shutdown_and_query_in_flight()) { + clients_.erase(client); + } +} + +///============================================================================= +rmw_ret_t NodeData::shutdown() +{ + std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; + if (is_shutdown_) { + return ret; } // Unregister this node from the ROS graph. @@ -376,7 +412,7 @@ rmw_ret_t NodeData::shutdown() // Check if the Node is shutdown. bool NodeData::is_shutdown() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return is_shutdown_; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index f3da6497..e26dd166 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -25,10 +25,13 @@ #include "graph_cache.hpp" #include "liveliness_utils.hpp" +#include "rmw_client_data.hpp" #include "rmw_publisher_data.hpp" #include "rmw_subscription_data.hpp" #include "rmw_service_data.hpp" +#include "rmw/rmw.h" + namespace rmw_zenoh_cpp { ///============================================================================= @@ -95,6 +98,21 @@ class NodeData final // Delete the ServiceData for a given rmw_service_t if present. void delete_service_data(const rmw_service_t * const service); + // Create a new ClientData for a given rmw_client_t. + bool create_client_data( + const rmw_client_t * const client, + const z_loaned_session_t * session, + std::size_t id, + const std::string & service_name, + const rosidl_service_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Retrieve the ClientData for a given rmw_client_t if present. + ClientDataPtr get_client_data(const rmw_client_t * const client); + + // Delete the ClientData for a given rmw_client_t if present. + void delete_client_data(const rmw_client_t * const client); + // Shutdown this NodeData. rmw_ret_t shutdown(); @@ -112,7 +130,7 @@ class NodeData final std::shared_ptr entity, zc_owned_liveliness_token_t token); // Internal mutex. - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // The rmw_node_t associated with this NodeData. const rmw_node_t * node_; // The entity id of this node as generated by get_next_entity_id(). @@ -130,6 +148,8 @@ class NodeData final std::unordered_map subs_; // Map of services. std::unordered_map services_; + // Map of clients. + std::unordered_map clients_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index dfac42f4..db096144 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -16,12 +16,14 @@ #include +#include #include #include #include #include #include "cdr.hpp" +#include "rmw_context_impl_s.hpp" #include "message_type_support.hpp" #include "logging_macros.hpp" #include "qos.hpp" @@ -31,6 +33,7 @@ #include "rmw/error_handling.h" #include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" namespace rmw_zenoh_cpp { @@ -223,11 +226,8 @@ PublisherData::PublisherData( ///============================================================================= rmw_ret_t PublisherData::publish( - const void * ros_message -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , std::optional & shm -#endif -) + const void * ros_message, + std::optional & shm_provider) { std::lock_guard lock(mutex_); if (is_shutdown_) { @@ -236,14 +236,12 @@ rmw_ret_t PublisherData::publish( } // Serialize data. - const size_t max_data_length = type_support_->get_estimated_serialized_size( + size_t max_data_length = type_support_->get_estimated_serialized_size( ros_message, type_support_impl_); // To store serialized message byte array. char * msg_bytes = nullptr; - -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY std::optional shmbuf = std::nullopt; auto always_free_shmbuf = rcpputils::make_scope_exit( [&shmbuf]() { @@ -251,36 +249,24 @@ rmw_ret_t PublisherData::publish( z_drop(z_move(shmbuf.value())); } }); -#endif rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , &shmbuf -#endif - ]() { - if (msg_bytes -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - && !shmbuf.has_value() -#endif - ) - { + [&msg_bytes, allocator, &shmbuf]() { + if (msg_bytes && !shmbuf.has_value()) { allocator->deallocate(msg_bytes, allocator->state); } }); -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // Get memory from SHM buffer if available. - if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) { + if (shm_provider.has_value()) { RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); - auto & provider = shm.value().shm_provider; - - // TODO(yellowhatter): SHM, use alignment based on msgsize_threshold - z_alloc_alignment_t alignment = {0}; + auto provider = shm_provider.value(); z_buf_layout_alloc_result_t alloc; + // TODO(yuyuan): SHM, configure this + z_alloc_alignment_t alignment = {5}; z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { @@ -292,14 +278,11 @@ rmw_ret_t PublisherData::publish( return RMW_RET_ERROR; } } else { -#endif - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY -} -#endif + // Get memory from the allocator. + msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); + } // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); @@ -329,15 +312,11 @@ rmw_ret_t PublisherData::publish( options.attachment = z_move(attachment); z_owned_bytes_t payload; -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY if (shmbuf.has_value()) { z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); } else { -#endif - z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY -} -#endif + z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); + } z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); if (res != Z_OK) { @@ -356,11 +335,8 @@ rmw_ret_t PublisherData::publish( ///============================================================================= rmw_ret_t PublisherData::publish_serialized_message( - const rmw_serialized_message_t * serialized_message -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , std::optional & /*shm_provider*/ -#endif -) + const rmw_serialized_message_t * serialized_message, + std::optional & /*shm_provider*/) { eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); @@ -420,10 +396,10 @@ liveliness::TopicInfo PublisherData::topic_info() const } ///============================================================================= -void PublisherData::copy_gid(rmw_gid_t * gid) const +void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { std::lock_guard lock(mutex_); - entity_->copy_gid(gid->data); + entity_->copy_gid(out_gid); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index eca222b7..4186f434 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -27,8 +27,11 @@ #include "event.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" -#include "shm_context.hpp" +#include "type_support_common.hpp" +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" #include "rmw/ret_types.h" namespace rmw_zenoh_cpp @@ -50,19 +53,13 @@ class PublisherData final // Publish a ROS message. rmw_ret_t publish( - const void * ros_message -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , std::optional & shm -#endif - ); + const void * ros_message, + std::optional & shm_provider); // Publish a serialized ROS message. rmw_ret_t publish_serialized_message( - const rmw_serialized_message_t * serialized_message -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , std::optional & shm -#endif - ); + const rmw_serialized_message_t * serialized_message, + std::optional & shm_provider); // Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity. std::size_t keyexpr_hash() const; @@ -71,7 +68,7 @@ class PublisherData final liveliness::TopicInfo topic_info() const; // Copy the GID of this PublisherData into an rmw_gid_t. - void copy_gid(rmw_gid_t * gid) const; + void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; // Get the events manager of this PublisherData. std::shared_ptr events_mgr() const; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 5cb53d58..012ffe4a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -15,8 +15,8 @@ #include "rmw_service_data.hpp" #include -#include +#include #include #include #include @@ -25,12 +25,16 @@ #include "attachment_helpers.hpp" #include "cdr.hpp" +#include "rmw_context_impl_s.hpp" +#include "message_type_support.hpp" #include "logging_macros.hpp" #include "qos.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" namespace rmw_zenoh_cpp { @@ -46,15 +50,17 @@ void service_data_handler(z_loaned_query_t * query, void * data) RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to obtain ServiceData from data for " - "service for %s", - z_loan(keystr) + "service for %.*s", + static_cast(z_string_len(z_loan(keystr))), + z_string_data(z_loan(keystr)) ); return; } - z_owned_query_t owned_query; - z_query_clone(&owned_query, query); - service_data->add_new_query(std::make_unique(owned_query)); + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + service_data->add_new_query(std::make_unique(query, received_timestamp)); } ///============================================================================= @@ -307,7 +313,7 @@ rmw_ret_t ServiceData::take_request( // Fill in the request header. // Get the sequence_number out of the attachment - rmw_zenoh_cpp::attachement_data_t attachment(z_query_attachment(loaned_query)); + rmw_zenoh_cpp::attachment_data_t attachment(z_query_attachment(loaned_query)); request_header->request_id.sequence_number = attachment.sequence_number; if (request_header->request_id.sequence_number < 0) { @@ -322,10 +328,7 @@ rmw_ret_t ServiceData::take_request( RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; } - - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - request_header->received_timestamp = now_ns.count(); + request_header->received_timestamp = query->get_received_timestamp(); // Add this query to the map, so that rmw_send_response can quickly look it up later. const size_t hash = rmw_zenoh_cpp::hash_gid(request_header->request_id.writer_guid); diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index cddaff88..f0676635 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -32,7 +32,6 @@ #include "service_type_support.hpp" #include "type_support_common.hpp" #include "zenoh_utils.hpp" -#include "rmw_data_types.hpp" #include "rcutils/allocator.h" diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 8d2c8805..b50debb7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -26,16 +27,16 @@ #include "attachment_helpers.hpp" #include "cdr.hpp" #include "identifier.hpp" +#include "rmw_context_impl_s.hpp" #include "message_type_support.hpp" #include "logging_macros.hpp" #include "qos.hpp" -// Use the implemented rmw_context_impl_t -#include "rmw_context_impl_s.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" #include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" namespace rmw_zenoh_cpp { @@ -59,18 +60,20 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data) return; } - attachement_data_t attachment(z_sample_attachment(sample)); + attachment_data_t attachment(z_sample_attachment(sample)); const z_loaned_bytes_t * payload = z_sample_payload(sample); z_owned_slice_t slice; z_bytes_to_slice(payload, &slice); + std::string topic_name(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); + sub_data->add_new_message( std::make_unique( slice, z_timestamp_ntp64_time(z_sample_timestamp(sample)), std::move(attachment)), - z_string_data(z_loan(keystr))); + topic_name); } } // namespace @@ -78,7 +81,7 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data) SubscriptionData::Message::Message( z_owned_slice_t p, uint64_t recv_ts, - attachement_data_t && attachment_) + attachment_data_t && attachment_) : payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { } @@ -195,12 +198,6 @@ SubscriptionData::SubscriptionData( // enable_shared_from_this, which is not available in constructors. bool SubscriptionData::init() { - if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "`reliability` no longer supported on subscriber. Ignoring..."); - } - // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr // in the closure callback once we switch to zenoh-cpp. z_owned_closure_sample_t callback; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 4f864994..6d17fbf4 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -34,7 +34,6 @@ #include "message_type_support.hpp" #include "attachment_helpers.hpp" #include "type_support_common.hpp" -#include "shm_context.hpp" #include "rcutils/allocator.h" @@ -52,13 +51,13 @@ class SubscriptionData final : public std::enable_shared_from_this & shm -#endif - ); + const void * ros_message, + std::optional & shm_provider); // Get a copy of the keyexpr_hash of this SubscriptionData's liveliness::Entity. std::size_t keyexpr_hash() const; diff --git a/rmw_zenoh_cpp/src/detail/shm_context.cpp b/rmw_zenoh_cpp/src/detail/shm_context.cpp deleted file mode 100644 index 8dcf8566..00000000 --- a/rmw_zenoh_cpp/src/detail/shm_context.cpp +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "shm_context.hpp" - -namespace rmw_zenoh_cpp -{ -///============================================================================= -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY -ShmContext::~ShmContext() -{ - z_drop(z_move(shm_provider)); -} -#endif -} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/shm_context.hpp b/rmw_zenoh_cpp/src/detail/shm_context.hpp deleted file mode 100644 index 619afa3e..00000000 --- a/rmw_zenoh_cpp/src/detail/shm_context.hpp +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__SHM_CONTEXT_HPP_ -#define DETAIL__SHM_CONTEXT_HPP_ - - -namespace rmw_zenoh_cpp -{ -///============================================================================= -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY -struct ShmContext -{ - z_owned_shm_provider_t shm_provider; - size_t msgsize_threshold; - - ~ShmContext(); -}; -#endif -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__SHM_CONTEXT_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/type_support.cpp b/rmw_zenoh_cpp/src/detail/type_support.cpp index b2cc2848..fff22474 100644 --- a/rmw_zenoh_cpp/src/detail/type_support.cpp +++ b/rmw_zenoh_cpp/src/detail/type_support.cpp @@ -17,6 +17,8 @@ // https://github.com/ros2/rmw_fastrtps/blob/469624e3d483290d6f88fe4b89ee5feaa7694e61/rmw_fastrtps_cpp/src/type_support_common.hpp #include +#include +#include #include "rmw/error_handling.h" diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index 9429b145..2537466b 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -18,8 +18,6 @@ #include #include -#include -#include #include "logging_macros.hpp" @@ -42,14 +40,6 @@ static const std::unordered_map zenoh_router_check_attempts() { const char * envar_value; - // The default value is to check indefinitely. - uint64_t default_value = std::numeric_limits::max(); + // The default is to check only once. + uint64_t default_value = 1; if (NULL != rcutils_get_env(router_check_attempts_envar, &envar_value)) { // NULL is returned if everything is ok. @@ -125,96 +114,10 @@ std::optional zenoh_router_check_attempts() return std::nullopt; } // If the value is 0, check indefinitely. - return default_value; + return std::numeric_limits::max(); } - // If unset, check indefinitely. + // If unset, use the default. return default_value; } - -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY -///============================================================================= -bool zenoh_shm_enabled() -{ - const char * envar_value; - - if (NULL != rcutils_get_env(zenoh_shm_enabled_envar, &envar_value)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.", - zenoh_shm_enabled_envar); - return zenoh_shm_enabled_default; - } - - if (strncmp(envar_value, "false", strlen(envar_value)) == 0) { - return false; - } - - if (strncmp(envar_value, "true", strlen(envar_value)) == 0) { - return true; - } - - return zenoh_shm_enabled_default; -} -///============================================================================= -size_t zenoh_shm_alloc_size() -{ - const char * envar_value; - - if (NULL != rcutils_get_env(zenoh_shm_alloc_size_envar, &envar_value)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.", - zenoh_shm_alloc_size_envar); - return zenoh_shm_alloc_size_default; - } - - // If the environment variable contains a value, handle it accordingly. - if (envar_value[0] != '\0') { - const auto read_value = std::strtoull(envar_value, nullptr, 10); - if (read_value > 0) { - if (read_value > std::numeric_limits::max()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s: value is too large!", - zenoh_shm_alloc_size_envar); - } else { - return read_value; - } - } - } - - return zenoh_shm_alloc_size_default; -} -///============================================================================= -size_t zenoh_shm_message_size_threshold() -{ - const char * envar_value; - - if (NULL != rcutils_get_env(zenoh_shm_message_size_threshold_envar, &envar_value)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.", - zenoh_shm_message_size_threshold_envar); - return zenoh_shm_message_size_threshold_default; - } - - // If the environment variable contains a value, handle it accordingly. - if (envar_value[0] != '\0') { - const auto read_value = std::strtoull(envar_value, nullptr, 10); - if (read_value > 0) { - if (read_value > std::numeric_limits::max()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s: value is too large!", - zenoh_shm_message_size_threshold_envar); - } else if ((read_value & (read_value - 1)) != 0) { // power of 2 check - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s: value must be power of 2!", - zenoh_shm_message_size_threshold_envar); - - } else { - return read_value; - } - } - } - - return zenoh_shm_message_size_threshold_default; -} -#endif } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp index 57aa657d..dceafe5d 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp @@ -16,7 +16,10 @@ #define DETAIL__ZENOH_CONFIG_HPP_ #include + #include +#include +#include #include "rmw/ret_types.h" @@ -57,36 +60,6 @@ rmw_ret_t get_z_config(const ConfigurableEntity & entity, z_owned_config_t * con /// @return The number of times to try connecting to a zenoh router and /// std::nullopt if establishing a connection to a router is not required. std::optional zenoh_router_check_attempts(); - -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY -///============================================================================= -/// Get the enabled state of shared memory subsystem -/// based on the environment variable ZENOH_SHM_ENABLED. -/// @details The behavior is as follows: -/// - If not set or not "false", the default value of "true" is returned. -/// - Else "false" is returned. -/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation -bool zenoh_shm_enabled(); - -///============================================================================= -/// Get the amount of shared memory to be pre-allocated for Zenoh SHM operation -/// based on the environment variable ZENOH_SHM_ALLOC_SIZE. -/// @details The behavior is as follows: -/// - If not set or <= 0, the default value of 1MB is returned. -/// - Else value of environemnt variable is returned. -/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation -size_t zenoh_shm_alloc_size(); - -///============================================================================= -/// Message size threshold for implicit SHM optimization based on the environment -/// variable ZENOH_SHM_MESSAGE_SIZE_THRESHOLD. -/// Messages smaller than this threshold will not be forwarded through Zenoh SHM -/// @details The behavior is as follows: -/// - If not set or <= 0, the default value of 2KB is returned. -/// - Else value of environemnt variable is returned. -/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation -size_t zenoh_shm_message_size_threshold(); -#endif } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_CONFIG_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp index 150cf111..b6c5d21e 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp @@ -16,6 +16,8 @@ #include +#include +#include #include #include "logging_macros.hpp" @@ -40,24 +42,19 @@ rmw_ret_t zenoh_router_check(const z_loaned_session_t * session) (*(static_cast(ctx)))++; }; - rmw_ret_t ret = RMW_RET_OK; z_owned_closure_zid_t router_callback; z_closure(&router_callback, callback, NULL, &context); if (z_info_routers_zid(session, z_move(router_callback)) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Failed to evaluate if Zenoh routers are connected to the session."); - ret = RMW_RET_ERROR; + return RMW_RET_ERROR; } else { if (context == 0) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to connect to a Zenoh router. " - "Have you started a router with `ros2 run rmw_zenoh_cpp rmw_zenohd`?"); - ret = RMW_RET_ERROR; + return RMW_RET_ERROR; } } - return ret; + return RMW_RET_OK; } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index f8f61a2e..d1ab4651 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -15,9 +15,12 @@ #include "zenoh_utils.hpp" #include +#include #include "attachment_helpers.hpp" -#include "rmw/types.h" +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" namespace rmw_zenoh_cpp { @@ -29,12 +32,24 @@ void create_map_and_set_sequence_num( auto now_ns = std::chrono::duration_cast(now); int64_t source_timestamp = now_ns.count(); - rmw_zenoh_cpp::attachement_data_t data(sequence_number, source_timestamp, gid); + rmw_zenoh_cpp::attachment_data_t data(sequence_number, source_timestamp, gid); data.serialize_to_zbytes(out_bytes); } ///============================================================================= -ZenohQuery::ZenohQuery(z_owned_query_t query) {query_ = query;} +ZenohQuery::ZenohQuery( + const z_loaned_query_t * query, + std::chrono::nanoseconds::rep received_timestamp) +{ + z_query_clone(&query_, query); + received_timestamp_ = received_timestamp; +} + +///============================================================================= +std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const +{ + return received_timestamp_; +} ///============================================================================= ZenohQuery::~ZenohQuery() {z_drop(z_move(query_));} @@ -44,10 +59,10 @@ const z_loaned_query_t * ZenohQuery::get_query() const {return z_loan(query_);} ///============================================================================= ZenohReply::ZenohReply( - z_owned_reply_t reply, + const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp) { - reply_ = reply; + z_reply_clone(&reply_, reply); received_timestamp_ = received_timestamp; } diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 97e1446f..5f569082 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include "rmw/types.h" @@ -35,7 +36,7 @@ create_map_and_set_sequence_num( class ZenohReply final { public: - ZenohReply(z_owned_reply_t reply, std::chrono::nanoseconds::rep received_timestamp); + ZenohReply(const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp); ~ZenohReply(); @@ -53,14 +54,17 @@ class ZenohReply final class ZenohQuery final { public: - ZenohQuery(z_owned_query_t query); + ZenohQuery(const z_loaned_query_t * query, std::chrono::nanoseconds::rep received_timestamp); ~ZenohQuery(); const z_loaned_query_t * get_query() const; + std::chrono::nanoseconds::rep get_received_timestamp() const; + private: z_owned_query_t query_; + std::chrono::nanoseconds::rep received_timestamp_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index c957dad0..68117032 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -14,13 +14,13 @@ #include "rmw/error_handling.h" #include "rmw/event.h" +#include "rmw/events_statuses/events_statuses.h" #include "rmw/types.h" #include "detail/event.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/rmw_publisher_data.hpp" #include "detail/rmw_subscription_data.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index f1d89f61..b80e7579 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -14,12 +14,20 @@ #include +#include #include +#include +#include "detail/guard_condition.hpp" #include "detail/identifier.hpp" +#include "detail/liveliness_utils.hpp" #include "detail/rmw_context_impl_s.hpp" #include "detail/zenoh_config.hpp" +#include "rcutils/env.h" +#include "detail/logging_macros.hpp" +#include "rcutils/strdup.h" +#include "rcutils/types.h" #include "rmw/init.h" #include "rmw/impl/cpp/macros.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 7cf350c2..885baaf6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include @@ -21,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -28,19 +28,24 @@ #include "detail/attachment_helpers.hpp" #include "detail/cdr.hpp" #include "detail/guard_condition.hpp" +#include "detail/graph_cache.hpp" #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" #include "detail/logging_macros.hpp" #include "detail/message_type_support.hpp" #include "detail/qos.hpp" #include "detail/rmw_context_impl_s.hpp" -#include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" #include "detail/zenoh_utils.hpp" #include "rcpputils/scope_exit.hpp" + +#include "rcutils/env.h" #include "rcutils/strdup.h" +#include "rcutils/types.h" + +#include "rmw/allocators.h" #include "rmw/dynamic_message_type_support.h" #include "rmw/error_handling.h" #include "rmw/features.h" @@ -109,9 +114,6 @@ const rosidl_service_type_support_t * find_service_type_support( extern "C" { -// TODO(yuyuan): SHM, make this configurable -#define SHM_BUF_OK_SIZE 2621440 - //============================================================================== /// Get the name of the rmw implementation being used const char * @@ -451,8 +453,7 @@ rmw_create_publisher( } //============================================================================== -/// Finalize a given publisher handle, reclaim the resources, and deallocate the -/// publisher handle. +/// Finalize a given publisher handle, reclaim the resources, and deallocate the publisher handle. rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) { @@ -598,11 +599,8 @@ rmw_publish( } return pub_data->publish( - ros_message -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , context_impl->shm() -#endif - ); + ros_message, + context_impl->shm_provider()); } //============================================================================== @@ -708,11 +706,8 @@ rmw_publish_serialized_message( RMW_CHECK_ARGUMENT_FOR_NULL(publisher_data, RMW_RET_INVALID_ARGUMENT); return publisher_data->publish_serialized_message( - serialized_message -#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY - , context_impl->shm() -#endif - ); + serialized_message, + context_impl->shm_provider()); } //============================================================================== @@ -1354,21 +1349,28 @@ rmw_create_client( const rmw_qos_profile_t * qos_profile) { RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return nullptr); - + RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); if (strlen(service_name) == 0) { RMW_SET_ERROR_MSG("service name is empty string"); return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); - RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); - + // Validate service name + int validation_result; + if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { + RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + return nullptr; + } + if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + return nullptr; + } RMW_CHECK_FOR_NULL_WITH_MSG( node->context, "expected initialized context", @@ -1384,23 +1386,22 @@ rmw_create_client( "unable to get rmw_context_impl_s", return nullptr); - rcutils_allocator_t * allocator = &node->context->options.allocator; - - // Validate service name - int validation_result; - - if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { - RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + // Get the service type support. + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_service_type_support return nullptr; } - if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + rcutils_allocator_t * allocator = &node->context->options.allocator; + if (!rcutils_allocator_is_valid(allocator)) { + RMW_SET_ERROR_MSG("allocator is invalid."); return nullptr; } - // client data - rmw_client_t * rmw_client = static_cast(allocator->zero_allocate( + // Create the rmw_client. + rmw_client_t * rmw_client = + static_cast(allocator->zero_allocate( 1, sizeof(rmw_client_t), allocator->state)); @@ -1408,228 +1409,46 @@ rmw_create_client( rmw_client, "failed to allocate memory for the client", return nullptr); - auto free_rmw_client = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(rmw_client, allocator->state); }); - auto client_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_client_data_t), allocator->state)); + auto node_data = context_impl->get_node_data(node); RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "failed to allocate memory for client data", + node_data, + "NodeData not found.", return nullptr); - auto free_client_data = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data, - client_data, - return nullptr, - rmw_zenoh_cpp::rmw_client_data_t, - ); - auto destruct_client_data = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->~rmw_client_data_t(), - rmw_zenoh_cpp::rmw_client_data_t); - }); - // Adapt any 'best available' QoS options - client_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( - nullptr, nullptr, &client_data->adapted_qos_profile, nullptr); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); - return nullptr; - } - - // Obtain the type support - const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); - if (type_support == nullptr) { - // error was already set by find_service_type_support + if (!node_data->create_client_data( + rmw_client, + context_impl->session(), + context_impl->get_next_entity_id(), + service_name, + type_support, + qos_profile)) + { + // Error already handled. return nullptr; } - auto service_members = static_cast(type_support->data); - auto request_members = static_cast( - service_members->request_members_->data); - auto response_members = static_cast( - service_members->response_members_->data); - - client_data->context = node->context; - client_data->typesupport_identifier = type_support->typesupport_identifier; - client_data->type_hash = type_support->get_type_hash_func(type_support); - client_data->request_type_support_impl = request_members; - client_data->response_type_support_impl = response_members; - - // Request type support - client_data->request_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::RequestTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->request_type_support, - "Failed to allocate rmw_zenoh_cpp::RequestTypeSupport", - return nullptr); - auto free_request_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->request_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->request_type_support, - client_data->request_type_support, - return nullptr, - rmw_zenoh_cpp::RequestTypeSupport, service_members); - auto destruct_request_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->request_type_support->~RequestTypeSupport(), - rmw_zenoh_cpp::RequestTypeSupport); - }); - - // Response type support - client_data->response_type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::ResponseTypeSupport), allocator->state)); - - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data->response_type_support, - "Failed to allocate rmw_zenoh_cpp::ResponseTypeSupport", - return nullptr); - auto free_response_type_support = rcpputils::make_scope_exit( - [client_data, allocator]() { - allocator->deallocate(client_data->response_type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - client_data->response_type_support, - client_data->response_type_support, - return nullptr, - rmw_zenoh_cpp::ResponseTypeSupport, service_members); - auto destruct_response_type_support = rcpputils::make_scope_exit( - [client_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - client_data->response_type_support->~ResponseTypeSupport(), - rmw_zenoh_cpp::ResponseTypeSupport); - }); - - // Populate the rmw_client. + // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased + // Client handle will be returned in the rmw_clients_t in rmw_wait + // from which we cannot obtain ClientData. + rmw_client->data = static_cast(node_data->get_client_data(rmw_client).get()); rmw_client->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_client->service_name = rcutils_strdup(service_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( rmw_client->service_name, - "failed to allocate client name", + "failed to allocate service client name", return nullptr); auto free_service_name = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); }); - // Note: Service request/response types will contain a suffix Request_ or Response_. - // We remove the suffix when appending the type to the liveliness tokens for - // better reusability within GraphCache. - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), rmw_client->service_name); - return nullptr; - } - - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - client_data->type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - const z_loaned_session_t * session = context_impl->session(); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "NodeData not found.", - return nullptr); - client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id()), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Client, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - node->context->actual_domain_id, - rmw_client->service_name, - std::move(service_type), - type_hash_c_str, - client_data->adapted_qos_profile} - ); - if (client_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the client %s.", - rmw_client->service_name); - return nullptr; - } - - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { - z_keyexpr_drop(z_move(client_data->keyexpr)); - }); - if (z_keyexpr_from_str( - &client_data->keyexpr, client_data->entity->topic_info()->topic_keyexpr_.c_str()) != Z_OK) - { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - - std::string liveliness_keyexpr = client_data->entity->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - auto free_token = rcpputils::make_scope_exit( - [client_data]() { - if (client_data != nullptr) { - z_drop(z_move(client_data->token)); - } - }); - if (zc_liveliness_declare_token( - session, &client_data->token, z_loan(liveliness_ke), - NULL) != Z_OK) - { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the client."); - return nullptr; - } - - rmw_client->data = client_data; - - free_token.cancel(); free_rmw_client.cancel(); - free_client_data.cancel(); - destruct_request_type_support.cancel(); - free_request_type_support.cancel(); - destruct_response_type_support.cancel(); - free_response_type_support.cancel(); - destruct_client_data.cancel(); free_service_name.cancel(); - free_ros_keyexpr.cancel(); return rmw_client; } @@ -1639,10 +1458,12 @@ rmw_create_client( rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1653,36 +1474,15 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rcutils_allocator_t * allocator = &node->context->options.allocator; - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - client_data, - "client implementation pointer is null.", - return RMW_RET_INVALID_ARGUMENT); - - // CLEANUP =================================================================== - z_drop(z_move(client_data->keyexpr)); - zc_liveliness_undeclare_token(z_move(client_data->token)); - - RMW_TRY_DESTRUCTOR( - client_data->request_type_support->~RequestTypeSupport(), rmw_zenoh_cpp::RequestTypeSupport, ); - allocator->deallocate(client_data->request_type_support, allocator->state); - - RMW_TRY_DESTRUCTOR( - client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, - ); - allocator->deallocate(client_data->response_type_support, allocator->state); - - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - if (!client_data->shutdown_and_query_in_flight()) { - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + auto node_data = context_impl->get_node_data(node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; } + // Remove the ClientData from NodeData. + node_data->delete_client_data(client); + + rcutils_allocator_t * allocator = &node->context->options.allocator; allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -1699,108 +1499,23 @@ rmw_send_request( { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client_data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); - if (client_data->is_shutdown()) { - return RMW_RET_ERROR; - } - - rmw_context_impl_s * context_impl = static_cast( - client_data->context->impl); - - rcutils_allocator_t * allocator = &(client_data->context->options.allocator); - - size_t max_data_length = ( - client_data->request_type_support->get_estimated_serialized_size( - ros_request, client_data->request_type_support_impl)); - - // Init serialized message byte array - char * request_bytes = static_cast(allocator->allocate( - max_data_length, - allocator->state)); - if (!request_bytes) { - RMW_SET_ERROR_MSG("failed allocate request message bytes"); - return RMW_RET_ERROR; - } - auto free_request_bytes = rcpputils::make_scope_exit( - [request_bytes, allocator]() { - allocator->deallocate(request_bytes, allocator->state); - }); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!client_data->request_type_support->serialize_ros_message( - ros_request, - ser.get_cdr(), - client_data->request_type_support_impl)) - { - return RMW_RET_ERROR; - } - - size_t data_length = ser.get_serialized_data_length(); - - *sequence_id = client_data->get_next_sequence_number(); - - // Send request - z_get_options_t opts; - z_get_options_default(&opts); - - z_owned_bytes_t attachment; - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - client_data->entity->copy_gid(local_gid); - rmw_zenoh_cpp::create_map_and_set_sequence_num( - &attachment, *sequence_id, - local_gid); - opts.attachment = z_move(attachment); - - // See the comment about the "num_in_flight" class variable in the - // rmw_client_data_t class for why we need to do this. - client_data->increment_in_flight_callbacks(); - - - opts.target = Z_QUERY_TARGET_ALL_COMPLETE; - // The default timeout for a z_get query is 10 seconds and if a response is not received within - // this window, the queryable will return an invalid reply. However, it is common for actions, - // which are implemented using services, to take an extended duration to complete. Hence, we set - // the timeout_ms to the largest supported value to account for most realistic scenarios. - opts.timeout_ms = std::numeric_limits::max(); - // Latest consolidation guarantees unicity of replies for the same key expression, - // which optimizes bandwidth. The default is "None", which imples replies may come in any order - // and any number. - opts.consolidation = z_query_consolidation_latest(); - - z_owned_bytes_t payload; - z_bytes_copy_from_buf( - &payload, reinterpret_cast(request_bytes), data_length); - opts.payload = z_move(payload); - - z_owned_closure_reply_t callback; - z_closure( - &callback, rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, - client_data); - z_get( - context_impl->session(), - z_loan(client_data->keyexpr), "", - z_move(callback), - &opts); - - return RMW_RET_OK; + return client_data->send_request(ros_request, sequence_id); } //============================================================================== @@ -1812,12 +1527,10 @@ rmw_take_response( void * ros_response, bool * taken) { + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); *taken = false; - RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, @@ -1825,67 +1538,13 @@ rmw_take_response( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_FOR_NULL_WITH_MSG( client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = client_data->pop_next_reply(); - if (latest_reply == nullptr) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - const z_loaned_sample_t * sample = latest_reply->get_sample().value(); - if (sample == NULL) { - RMW_SET_ERROR_MSG("invalid reply sample"); - return RMW_RET_ERROR; - } - - z_owned_slice_t payload; - z_bytes_to_slice(z_sample_payload(sample), &payload); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))), - z_slice_len(z_loan(payload))); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!client_data->response_type_support->deserialize_ros_message( - deser.get_cdr(), - ros_response, - client_data->response_type_support_impl)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS response"); - return RMW_RET_ERROR; - } - - // Fill in the request_header - - rmw_zenoh_cpp::attachement_data_t attachment(z_sample_attachment(sample)); - - request_header->request_id.sequence_number = attachment.sequence_number; - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } - - memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE); - - request_header->source_timestamp = attachment.source_timestamp; - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); - return RMW_RET_ERROR; - } - - request_header->received_timestamp = latest_reply->get_received_timestamp(); - - z_drop(z_move(payload)); - *taken = true; - - return RMW_RET_OK; + return client_data->take_response(request_header, ros_response, taken); } //============================================================================== @@ -1902,12 +1561,11 @@ rmw_client_request_publisher_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - *qos = client_data->adapted_qos_profile; + *qos = client_data->topic_info().qos_; return RMW_RET_OK; } @@ -1931,7 +1589,6 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profile) { - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, @@ -1980,7 +1637,6 @@ rmw_create_service( return nullptr; } - // SERVICE DATA ============================================================== rcutils_allocator_t * allocator = &node->context->options.allocator; if (!rcutils_allocator_is_valid(allocator)) { RMW_SET_ERROR_MSG("allocator is invalid."); @@ -2021,7 +1677,7 @@ rmw_create_service( } // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased - // Service handle will be returned in the rmw_service_t in rmw_wait + // Service handle will be returned in the rmw_services_t in rmw_wait // from which we cannot obtain ServiceData. rmw_service->data = static_cast(node_data->get_service_data(rmw_service).get()); rmw_service->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; @@ -2412,8 +2068,8 @@ check_and_attach_condition( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2573,8 +2229,8 @@ rmw_wait( if (clients) { for (size_t i = 0; i < clients->client_count; ++i) { - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(clients->clients[i]); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(clients->clients[i]); if (client_data == nullptr) { continue; } @@ -2774,7 +2430,7 @@ rmw_get_gid_for_publisher(const rmw_publisher_t * publisher, rmw_gid_t * gid) RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - pub_data->copy_gid(gid); + pub_data->copy_gid(gid->data); return RMW_RET_OK; } @@ -2786,12 +2442,12 @@ rmw_get_gid_for_client(const rmw_client_t * client, rmw_gid_t * gid) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(gid, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - client_data->entity->copy_gid(gid->data); + client_data->copy_gid(gid->data); return RMW_RET_OK; } @@ -2837,29 +2493,16 @@ rmw_service_server_is_available( RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(is_available, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); if (client_data == nullptr) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( "Unable to retreive client_data from client for service %s", client->service_name); return RMW_RET_INVALID_ARGUMENT; } - std::string service_type = client_data->request_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Request_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for client %s. Report this bug", - service_type.c_str(), client->service_name); - return RMW_RET_INVALID_ARGUMENT; - } - return node->context->impl->graph_cache()->service_server_is_available( - client->service_name, service_type.c_str(), is_available); + client_data->topic_info(), is_available); } //============================================================================== @@ -2932,11 +2575,11 @@ rmw_client_set_on_new_response_callback( const void * user_data) { RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_client_data_t * client_data = - static_cast(client->data); + rmw_zenoh_cpp::ClientData * client_data = + static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - client_data->data_callback_mgr.set_callback( - user_data, std::move(callback)); + client_data->set_on_new_response_callback( + std::move(callback), user_data); return RMW_RET_OK; } } // extern "C" diff --git a/rmw_zenoh_cpp/src/zenohd/main.cpp b/rmw_zenoh_cpp/src/zenohd/main.cpp index c9034a2f..58a1b39f 100644 --- a/rmw_zenoh_cpp/src/zenohd/main.cpp +++ b/rmw_zenoh_cpp/src/zenohd/main.cpp @@ -66,7 +66,7 @@ int main(int argc, char ** argv) return 1; } - // Enable the zenoh built-in logger + // Enable the zenoh built-in logger. zc_try_init_log_from_env(); // Initialize the zenoh configuration for the router. diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 54401dd4..086ea5db 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -15,9 +15,7 @@ find_package(ament_cmake_vendor_package REQUIRED) # Note: We separate the two args needed for cargo with "$" and not ";" as the # latter is a list separater in cmake and hence the string will be split into two # when expanded. -set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/transport_compression zenoh/transport_tcp zenoh/transport_tls") - -set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY ON CACHE BOOL "Compile Zenoh RMW with Shared Memory support") +set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_tls") # Set VCS_VERSION to include latest changes from zenoh-c to benefit from : # - https://github.com/eclipse-zenoh/zenoh-c/pull/340 (fix build issue) @@ -28,10 +26,9 @@ set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY ON CACHE BOOL "Compile Zenoh RMW with Sha # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION a57fda736d2f8866c06509f34b5bc73e372c6aaf + VCS_VERSION 4b759d4e4d35a97d7b20b5c4003b8b764a10f679 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" - "-DZENOHC_BUILD_WITH_SHARED_MEMORY=${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" "-DZENOHC_CUSTOM_TARGET=${ZENOHC_CUSTOM_TARGET}" )