diff --git a/rmw_cyclonedds_cpp/CMakeLists.txt b/rmw_cyclonedds_cpp/CMakeLists.txt index f61560c4..480f1ac9 100644 --- a/rmw_cyclonedds_cpp/CMakeLists.txt +++ b/rmw_cyclonedds_cpp/CMakeLists.txt @@ -28,6 +28,7 @@ endif() find_package(ament_cmake_ros REQUIRED) find_package(rcutils REQUIRED) +find_package(rcpputils REQUIRED) #find_package(cyclonedds_cmake_module REQUIRED) find_package(CycloneDDS QUIET CONFIG) @@ -38,13 +39,16 @@ if(NOT CycloneDDS_FOUND) endif() find_package(rmw REQUIRED) +find_package(rmw_dds_common REQUIRED) find_package(rosidl_runtime_c REQUIRED) find_package(rosidl_typesupport_introspection_c REQUIRED) find_package(rosidl_typesupport_introspection_cpp REQUIRED) ament_export_dependencies(rcutils) +ament_export_dependencies(rcpputils) ament_export_dependencies(rmw) ament_export_dependencies(rosidl_runtime_c) +ament_export_dependencies(rmw_dds_common) ament_export_dependencies(rosidl_typesupport_introspection_c) ament_export_dependencies(rosidl_typesupport_introspection_cpp) @@ -52,9 +56,9 @@ add_library(rmw_cyclonedds_cpp src/rmw_node.cpp src/serdata.cpp src/serdes.cpp - src/graphrhc.cpp src/u16string.cpp src/exception.cpp + src/demangle.cpp src/deserialization_exception.cpp src/Serialization.cpp src/TypeSupport2.cpp) @@ -64,13 +68,17 @@ target_include_directories(rmw_cyclonedds_cpp PUBLIC $ ) -target_link_libraries(rmw_cyclonedds_cpp CycloneDDS::ddsc) +target_link_libraries(rmw_cyclonedds_cpp + CycloneDDS::ddsc +) ament_target_dependencies(rmw_cyclonedds_cpp "rcutils" + "rcpputils" "rosidl_typesupport_introspection_c" "rosidl_typesupport_introspection_cpp" "rmw" + "rmw_dds_common" "rosidl_runtime_c" ) diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/graphrhc.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/graphrhc.hpp deleted file mode 100644 index a4541ac7..00000000 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/graphrhc.hpp +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2019 ADLINK Technology -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#ifndef RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ -#define RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ - -#include "dds/dds.h" - -/* Introduction of custom RHC coincides with promoting the library instance & domains to entities, - and so with the introduction of DDS_CYCLONEDDS_HANDLE. */ -#ifdef DDS_CYCLONEDDS_HANDLE -#include "dds/ddsc/dds_rhc.h" - -struct dds_rhc * graphrhc_new(); -#endif // DDS_CYCLONEDDS_HANDLE - -#endif // RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ diff --git a/rmw_cyclonedds_cpp/package.xml b/rmw_cyclonedds_cpp/package.xml index e3cdc7d5..e1f71549 100644 --- a/rmw_cyclonedds_cpp/package.xml +++ b/rmw_cyclonedds_cpp/package.xml @@ -11,7 +11,9 @@ cyclonedds rcutils + rcpputils rmw + rmw_dds_common rosidl_runtime_c rosidl_typesupport_introspection_c rosidl_typesupport_introspection_cpp diff --git a/rmw_cyclonedds_cpp/src/demangle.cpp b/rmw_cyclonedds_cpp/src/demangle.cpp new file mode 100644 index 00000000..6c87f8a8 --- /dev/null +++ b/rmw_cyclonedds_cpp/src/demangle.cpp @@ -0,0 +1,197 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rcpputils/find_and_replace.hpp" +#include "rcutils/logging_macros.h" +#include "rcutils/types.h" +#include "namespace_prefix.hpp" +#include "demangle.hpp" + +extern "C" +{ +const char * const ros_topic_prefix = "rt"; +const char * const ros_service_requester_prefix = "rq"; +const char * const ros_service_response_prefix = "rr"; + +const std::vector _ros_prefixes = +{ROS_TOPIC_PREFIX, ROS_SERVICE_REQUESTER_PREFIX, ROS_SERVICE_RESPONSE_PREFIX}; +} // extern "C" + +/// Returns `name` stripped of `prefix`. +std::string +_resolve_prefix(const std::string & name, const std::string & prefix) +{ + if (name.rfind(prefix, 0) == 0 && name.at(prefix.length()) == '/') { + return name.substr(prefix.length()); + } + return ""; +} + +/// Strip the ROS specific prefix if it exists from the topic name. +std::string +_strip_ros_prefix_if_exists(const std::string & topic_name) +{ + for (const auto & prefix : _ros_prefixes) { + if (topic_name.rfind(prefix, 0) == 0 && topic_name.at(prefix.length()) == '/') { + return topic_name.substr(prefix.length()); + } + } + return topic_name; +} + +/// Return the demangle ROS topic or the original if not a ROS topic. +std::string +_demangle_if_ros_topic(const std::string & topic_name) +{ + return _strip_ros_prefix_if_exists(topic_name); +} + +/// Return the demangled ROS type or the original if not a ROS type. +std::string +_demangle_if_ros_type(const std::string & dds_type_string) +{ + if (dds_type_string[dds_type_string.size() - 1] != '_') { + // not a ROS type + return dds_type_string; + } + + std::string substring = "dds_::"; + size_t substring_position = dds_type_string.find(substring); + if (substring_position == std::string::npos) { + // not a ROS type + return dds_type_string; + } + + std::string type_namespace = dds_type_string.substr(0, substring_position); + type_namespace = rcpputils::find_and_replace(type_namespace, "::", "/"); + size_t start = substring_position + substring.size(); + std::string type_name = dds_type_string.substr(start, dds_type_string.length() - 1 - start); + return type_namespace + type_name; +} + +/// Return the topic name for a given topic if it is part of one, else "". +std::string +_demangle_ros_topic_from_topic(const std::string & topic_name) +{ + return _resolve_prefix(topic_name, ros_topic_prefix); +} + +/// Return the service name for a given topic if it is part of one, else "". +std::string +_demangle_service_from_topic( + const std::string & prefix, const std::string & topic_name, std::string suffix) +{ + std::string service_name = _resolve_prefix(topic_name, prefix); + if ("" == service_name) { + return ""; + } + + size_t suffix_position = service_name.rfind(suffix); + if (suffix_position != std::string::npos) { + if (service_name.length() - suffix_position - suffix.length() != 0) { + RCUTILS_LOG_WARN_NAMED( + "rmw_fastrtps_shared_cpp", + "service topic has service prefix and a suffix, but not at the end" + ", report this: '%s'", topic_name.c_str()); + return ""; + } + } else { + RCUTILS_LOG_WARN_NAMED( + "rmw_fastrtps_shared_cpp", + "service topic has prefix but no suffix" + ", report this: '%s'", topic_name.c_str()); + return ""; + } + return service_name.substr(0, suffix_position); +} + +std::string +_demangle_service_from_topic(const std::string & topic_name) +{ + const std::string demangled_topic = _demangle_service_reply_from_topic(topic_name); + if ("" != demangled_topic) { + return demangled_topic; + } + return _demangle_service_request_from_topic(topic_name); +} + + +std::string +_demangle_service_request_from_topic(const std::string & topic_name) +{ + return _demangle_service_from_topic(ros_service_requester_prefix, topic_name, "Request"); +} + +std::string +_demangle_service_reply_from_topic(const std::string & topic_name) +{ + return _demangle_service_from_topic(ros_service_response_prefix, topic_name, "Reply"); +} + +/// Return the demangled service type if it is a ROS srv type, else "". +std::string +_demangle_service_type_only(const std::string & dds_type_name) +{ + std::string ns_substring = "dds_::"; + size_t ns_substring_position = dds_type_name.find(ns_substring); + if (std::string::npos == ns_substring_position) { + // not a ROS service type + return ""; + } + auto suffixes = { + std::string("_Response_"), + std::string("_Request_"), + }; + std::string found_suffix = ""; + size_t suffix_position = 0; + for (auto suffix : suffixes) { + suffix_position = dds_type_name.rfind(suffix); + if (suffix_position != std::string::npos) { + if (dds_type_name.length() - suffix_position - suffix.length() != 0) { + RCUTILS_LOG_WARN_NAMED( + "rmw_fastrtps_shared_cpp", + "service type contains 'dds_::' and a suffix, but not at the end" + ", report this: '%s'", dds_type_name.c_str()); + continue; + } + found_suffix = suffix; + break; + } + } + if (std::string::npos == suffix_position) { + RCUTILS_LOG_WARN_NAMED( + "rmw_fastrtps_shared_cpp", + "service type contains 'dds_::' but does not have a suffix" + ", report this: '%s'", dds_type_name.c_str()); + return ""; + } + // everything checks out, reformat it from '[type_namespace::]dds_::' + // to '[type_namespace/]' + std::string type_namespace = dds_type_name.substr(0, ns_substring_position); + type_namespace = rcpputils::find_and_replace(type_namespace, "::", "/"); + size_t start = ns_substring_position + ns_substring.length(); + std::string type_name = dds_type_name.substr(start, suffix_position - start); + return type_namespace + type_name; +} + +std::string +_identity_demangle(const std::string & name) +{ + return name; +} diff --git a/rmw_cyclonedds_cpp/src/demangle.hpp b/rmw_cyclonedds_cpp/src/demangle.hpp new file mode 100644 index 00000000..02a3e005 --- /dev/null +++ b/rmw_cyclonedds_cpp/src/demangle.hpp @@ -0,0 +1,57 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DEMANGLE_HPP_ +#define DEMANGLE_HPP_ + +#include + +/// Return the demangle ROS topic or the original if not a ROS topic. +std::string +_demangle_if_ros_topic(const std::string & topic_name); + +/// Return the demangled ROS type or the original if not a ROS type. +std::string +_demangle_if_ros_type(const std::string & dds_type_string); + +/// Return the topic name for a given topic if it is part of one, else "". +std::string +_demangle_ros_topic_from_topic(const std::string & topic_name); + +/// Return the service name for a given topic if it is part of a service, else "". +std::string +_demangle_service_from_topic(const std::string & topic_name); + +/// Return the service name for a given topic if it is part of a service request, else "". +std::string +_demangle_service_request_from_topic(const std::string & topic_name); + +/// Return the service name for a given topic if it is part of a service reply, else "". +std::string +_demangle_service_reply_from_topic(const std::string & topic_name); + +/// Return the demangled service type if it is a ROS srv type, else "". +std::string +_demangle_service_type_only(const std::string & dds_type_name); + +/// Used when ros names are not mangled. +std::string +_identity_demangle(const std::string & name); + + +using DemangleFunction = std::string (*)(const std::string &); +using MangleFunction = DemangleFunction; + +#endif // DEMANGLE_HPP_ diff --git a/rmw_cyclonedds_cpp/src/graphrhc.cpp b/rmw_cyclonedds_cpp/src/graphrhc.cpp deleted file mode 100644 index 0a31039d..00000000 --- a/rmw_cyclonedds_cpp/src/graphrhc.cpp +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2019 ADLINK Technology -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#ifndef RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ -#define RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ - -#include "dds/dds.h" - -/* Introduction of custom RHC coincides with promoting the library instance & domains to entities, - and so with the introduction of DDS_CYCLONEDDS_HANDLE. */ -#ifdef DDS_CYCLONEDDS_HANDLE - -#include "dds/ddsc/dds_rhc.h" - -struct graphrhc : dds_rhc -{ - struct dds_reader * reader; -}; - -static dds_return_t graphrhc_associate( - struct dds_rhc * rhc_cmn, struct dds_reader * reader, - const struct ddsi_sertopic * topic, - struct ddsi_tkmap * tkmap) -{ - // C++ doesn't grok the fake inheritance in C, so static_cast won't work - struct graphrhc * rhc = reinterpret_cast(rhc_cmn); - rhc->reader = reader; - static_cast(topic); - static_cast(tkmap); - return DDS_RETCODE_OK; -} - -static void graphrhc_free(struct ddsi_rhc * rhc_cmn) -{ - // C++ doesn't grok the fake inheritance in C, so static_cast won't work - struct graphrhc * rhc = reinterpret_cast(rhc_cmn); - delete rhc; -} - -static bool graphrhc_store( - struct ddsi_rhc * __restrict rhc_cmn, - const struct ddsi_writer_info * __restrict wrinfo, - struct ddsi_serdata * __restrict sample, - struct ddsi_tkmap_instance * __restrict tk) -{ - // C++ doesn't grok the fake inheritance in C, so static_cast won't work - struct graphrhc * rhc = reinterpret_cast(rhc_cmn); - dds_reader_data_available_cb(rhc->reader); - static_cast(wrinfo); - static_cast(sample); - static_cast(tk); - return true; -} - -static void graphrhc_unregister_wr( - struct ddsi_rhc * __restrict rhc_cmn, - const struct ddsi_writer_info * __restrict wrinfo) -{ - // C++ doesn't grok the fake inheritance in C, so static_cast won't work - struct graphrhc * rhc = reinterpret_cast(rhc_cmn); - dds_reader_data_available_cb(rhc->reader); - static_cast(wrinfo); -} - -static void graphrhc_relinquish_ownership( - struct ddsi_rhc * __restrict rhc_cmn, - const uint64_t wr_iid) -{ - static_cast(rhc_cmn); - static_cast(wr_iid); -} - -static void graphrhc_set_qos(struct ddsi_rhc * rhc_cmn, const struct dds_qos * qos) -{ - static_cast(rhc_cmn); - static_cast(qos); -} - -static int graphrhc_read( - struct dds_rhc * rhc_cmn, bool lock, void ** values, - dds_sample_info_t * info_seq, uint32_t max_samples, uint32_t mask, - dds_instance_handle_t handle, struct dds_readcond * cond) -{ - static_cast(rhc_cmn); - static_cast(lock); - static_cast(values); - static_cast(info_seq); - static_cast(max_samples); - static_cast(mask); - static_cast(handle); - static_cast(cond); - return 0; -} - -static int graphrhc_take( - struct dds_rhc * rhc_cmn, bool lock, void ** values, - dds_sample_info_t * info_seq, uint32_t max_samples, uint32_t mask, - dds_instance_handle_t handle, struct dds_readcond * cond) -{ - static_cast(rhc_cmn); - static_cast(lock); - static_cast(values); - static_cast(info_seq); - static_cast(max_samples); - static_cast(mask); - static_cast(handle); - static_cast(cond); - return 0; -} - -static int graphrhc_takecdr( - struct dds_rhc * rhc_cmn, bool lock, struct ddsi_serdata ** values, - dds_sample_info_t * info_seq, uint32_t max_samples, - uint32_t sample_states, uint32_t view_states, uint32_t instance_states, - dds_instance_handle_t handle) -{ - static_cast(rhc_cmn); - static_cast(lock); - static_cast(values); - static_cast(info_seq); - static_cast(max_samples); - static_cast(sample_states); - static_cast(view_states); - static_cast(instance_states); - static_cast(handle); - return 0; -} - -static bool graphrhc_add_readcondition(struct dds_rhc * rhc_cmn, struct dds_readcond * cond) -{ - static_cast(rhc_cmn); - static_cast(cond); - return true; -} - -static void graphrhc_remove_readcondition(struct dds_rhc * rhc_cmn, struct dds_readcond * cond) -{ - static_cast(rhc_cmn); - static_cast(cond); -} - -static uint32_t graphrhc_lock_samples(struct dds_rhc * rhc_cmn) -{ - static_cast(rhc_cmn); - return 0; -} - -static const struct dds_rhc_ops graphrhc_ops = { - { - graphrhc_store, - graphrhc_unregister_wr, - graphrhc_relinquish_ownership, - graphrhc_set_qos, - graphrhc_free - }, - graphrhc_read, - graphrhc_take, - graphrhc_takecdr, - graphrhc_add_readcondition, - graphrhc_remove_readcondition, - graphrhc_lock_samples, - graphrhc_associate -}; - -struct dds_rhc * graphrhc_new() -{ - auto rhc = new graphrhc; - rhc->common.ops = &graphrhc_ops; - return static_cast(rhc); -} - -#endif // DDS_CYCLONEDDS_HANDLE - -#endif // RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 141706b7..66d859e0 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -48,6 +48,7 @@ #include "fallthrough_macro.hpp" #include "Serialization.hpp" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/impl/cpp/key_value.hpp" #include "TypeSupport2.hpp" @@ -55,11 +56,15 @@ #include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" #include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" -#if RMW_VERSION_GTE(0, 8, 2) #include "rmw/get_topic_endpoint_info.h" #include "rmw/incompatible_qos_events_statuses.h" #include "rmw/topic_endpoint_info_array.h" -#endif + +#include "rmw_dds_common/context.hpp" +#include "rmw_dds_common/graph_cache.hpp" +#include "rmw_dds_common/msg/participant_entities_info.hpp" + +#include "rosidl_typesupport_cpp/message_type_support.hpp" #include "namespace_prefix.hpp" @@ -67,22 +72,7 @@ #include "dds/ddsi/ddsi_sertopic.h" #include "rmw_cyclonedds_cpp/serdes.hpp" #include "serdata.hpp" - -/* Proper multi-domain support requires eliminating the "extra" participant, which in turn relies on - the promotion of the Cyclone DDS library instance and the daomsin to full-fledged entities. The - custom RHC was introduced at essentially the same time */ -#ifdef DDS_CYCLONEDDS_HANDLE -#define MULTIDOMAIN 1 -#include "rmw_cyclonedds_cpp/graphrhc.hpp" -#else -#define MULTIDOMAIN 0 -#endif - -#if RMW_VERSION_GTE(0, 8, 1) && MULTIDOMAIN -#define SUPPORT_LOCALHOST 1 -#else -#define SUPPORT_LOCALHOST 0 -#endif +#include "demangle.hpp" /* Security must be enabled when compiling and requires cyclone to support QOS property lists */ #if DDS_HAS_SECURITY && DDS_HAS_PROPERTY_LIST_QOS @@ -119,6 +109,8 @@ #define RET_WRONG_IMPLID(var) RET_WRONG_IMPLID_X(var, return RMW_RET_ERROR) #define RET_NULL_OR_EMPTYSTR(var) RET_NULL_OR_EMPTYSTR_X(var, return RMW_RET_ERROR) +using rmw_dds_common::msg::ParticipantEntitiesInfo; + const char * const eclipse_cyclonedds_identifier = "rmw_cyclonedds_cpp"; const char * const eclipse_cyclonedds_serialization_format = "cdr"; @@ -139,15 +131,58 @@ bool operator<(dds_builtintopic_guid_t const & a, dds_builtintopic_guid_t const return memcmp(&a, &b, sizeof(dds_builtintopic_guid_t)) < 0; } -static const dds_entity_t builtin_topics[] = { - DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, - DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, - DDS_BUILTIN_TOPIC_DCPSPUBLICATION +static rmw_ret_t discovery_thread_stop(rmw_dds_common::Context & context); +static bool dds_qos_to_rmw_qos(const dds_qos_t * dds_qos, rmw_qos_profile_t * qos_policies); + +static rmw_publisher_t * create_publisher( + dds_entity_t dds_ppant, dds_entity_t dds_pub, + const rosidl_message_type_support_t * type_supports, + const char * topic_name, const rmw_qos_profile_t * qos_policies, + const rmw_publisher_options_t * publisher_options +); +static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher); + +static rmw_subscription_t * create_subscription( + dds_entity_t dds_ppant, dds_entity_t dds_pub, + const rosidl_message_type_support_t * type_supports, + const char * topic_name, const rmw_qos_profile_t * qos_policies, + const rmw_subscription_options_t * subscription_options +); +static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription); + +static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl); +static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc); + +struct CddsDomain; +struct CddsWaitset; + +struct Cdds +{ + std::mutex lock; + + /* Map of domain id to per-domain state, used by create/destroy node */ + std::mutex domains_lock; + std::map domains; + + /* special guard condition that gets attached to every waitset but that is never triggered: + this way, we can avoid Cyclone's behaviour of always returning immediately when no + entities are attached to a waitset */ + dds_entity_t gc_for_empty_waitset; + + /* set of waitsets protected by lock, used to invalidate all waitsets caches when an entity is + deleted */ + std::unordered_set waitsets; + + Cdds() + : gc_for_empty_waitset(0) + {} }; -struct builtin_readers +static Cdds gcdds; + +struct CddsEntity { - dds_entity_t rds[sizeof(builtin_topics) / sizeof(builtin_topics[0])]; + dds_entity_t enth; }; #if RMW_SUPPORT_SECURITY @@ -162,32 +197,114 @@ struct dds_security_files_t }; #endif -struct CddsEntity +struct CddsDomain { - dds_entity_t enth; + /* This RMW implementation currently implements localhost-only by explicitly creating + domains with a configuration that consists of: (1) a hard-coded selection of + "localhost" as the network interface address; (2) followed by the contents of the + CYCLONEDDS_URI environment variable: + + - the "localhost" hostname should resolve to 127.0.0.1 (or equivalent) for IPv4 and + to ::1 for IPv6, so we don't have to worry about which of IPv4 or IPv6 is used (as + would be the case with a numerical IP address), nor do we have to worry about the + name of the loopback interface; + + - if the machine's configuration doesn't properly resolve "localhost", you can still + override via $CYCLONEDDS_URI. + + The CddsDomain type is used to track which domains exist and how many nodes are in + it. Because the domain is instantiated with the first nodes created in that domain, + the other nodes must have the same localhost-only setting. (It bugs out if not.) + Everything resets automatically when the last node in the domain is deleted. + + (It might be better still to for Cyclone to provide "loopback" or something similar + as a generic alias for a loopback interface ...) + + There are a few issues with the current support for creating domains explicitly in + Cyclone, fixing those might relax alter or relax some of the above. */ + + bool localhost_only; + uint32_t refcount; + + /* handle of the domain entity */ + dds_entity_t domain_handle; + + /* Default constructor so operator[] can be safely be used to look one up */ + CddsDomain() + : localhost_only(false), refcount(0), domain_handle(0) + {} + + ~CddsDomain() + {} }; -struct CddsNode : CddsEntity +struct rmw_context_impl_t { - dds_entity_t pub; - dds_entity_t sub; - rmw_guard_condition_t * graph_guard_condition; -#if MULTIDOMAIN - builtin_readers brd; -#endif - /* domain id can be retrieved from the entities, but that requires assuming the call - succeeds, storing it here means we are sure to have it */ + rmw_dds_common::Context common; dds_domainid_t domain_id; + dds_entity_t ppant; + + /* handles for built-in topic readers */ + dds_entity_t rd_participant; + dds_entity_t rd_subscription; + dds_entity_t rd_publication; + + /* DDS publisher, subscriber used for ROS2 publishers and subscriptions */ + dds_entity_t dds_pub; + dds_entity_t dds_sub; + + rmw_context_impl_t() + : common(), domain_id(UINT32_MAX), ppant(0) + { + /* destructor relies on these being initialized properly */ + common.thread_is_running.store(false); + common.graph_guard_condition = nullptr; + common.pub = nullptr; + common.sub = nullptr; + } + ~rmw_context_impl_t() + { + discovery_thread_stop(common); + common.graph_cache.clear_on_change_callback(); + if (common.graph_guard_condition) { + destroy_guard_condition(common.graph_guard_condition); + } + if (common.pub) { + destroy_publisher(common.pub); + } + if (common.sub) { + destroy_subscription(common.sub); + } + if (ppant > 0 && dds_delete(ppant) < 0) { + RCUTILS_SAFE_FWRITE_TO_STDERR( + "Failed to destroy domain in destructor\n"); + } + if (domain_id != UINT32_MAX) { + std::lock_guard lock(gcdds.domains_lock); + CddsDomain & dom = gcdds.domains[domain_id]; + assert(dom.refcount > 0); + if (--dom.refcount == 0) { + static_cast(dds_delete(dom.domain_handle)); + gcdds.domains.erase(domain_id); + } + } + } +}; + +struct CddsNode +{ }; struct CddsPublisher : CddsEntity { dds_instance_handle_t pubiid; + rmw_gid_t gid; struct ddsi_sertopic * sertopic; }; struct CddsSubscription : CddsEntity { + rmw_gid_t gid; dds_entity_t rdcondh; }; @@ -239,93 +356,6 @@ struct CddsWaitset std::vector evs; }; -#if SUPPORT_LOCALHOST -struct CddsDomain -{ - /* This RMW implementation currently implements localhost-only by explicitly creating - domains with a configuration that consists of: (1) a hard-coded selection of - "localhost" as the network interface address; (2) followed by the contents of the - CYCLONEDDS_URI environment variable: - - - the "localhost" hostname should resolve to 127.0.0.1 (or equivalent) for IPv4 and - to ::1 for IPv6, so we don't have to worry about which of IPv4 or IPv6 is used (as - would be the case with a numerical IP address), nor do we have to worry about the - name of the loopback interface; - - - if the machine's configuration doesn't properly resolve "localhost", you can still - override via $CYCLONEDDS_URI. - - The CddsDomain type is used to track which domains exist and how many nodes are in - it. Because the domain is instantiated with the first nodes created in that domain, - the other nodes must have the same localhost-only setting. (It bugs out if not.) - Everything resets automatically when the last node in the domain is deleted. - - (It might be better still to for Cyclone to provide "loopback" or something similar - as a generic alias for a loopback interface ...) - - There are a few issues with the current support for creating domains explicitly in - Cyclone, fixing those might relax alter or relax some of the above. */ - - bool localhost_only; - uint32_t n_nodes; - - /* handle of the domain entity, for versions of Cyclone that have an updated version of - dds_create_domain that properly returns a handle; the original version has a value of - 0 in here (DDS_RETCODE_OK) which is never a valid handle */ - dds_entity_t domain_handle; - - /* Default constructor so operator[] can be safely be used to look one up */ - CddsDomain() - : localhost_only(false), n_nodes(0), domain_handle(0) - {} -}; -#endif - -struct Cdds -{ - std::mutex lock; - -#if SUPPORT_LOCALHOST - /* Map of domain id to per-domain state, used by create/destroy node */ - std::mutex domains_lock; - std::map domains; -#endif - -#if !MULTIDOMAIN - uint32_t refcount; - dds_entity_t ppant; - builtin_readers brd; -#endif - - /* special guard condition that gets attached to every waitset but that is never triggered: - this way, we can avoid Cyclone's behaviour of always returning immediately when no - entities are attached to a waitset */ - dds_entity_t gc_for_empty_waitset; - - /* set of waitsets protected by lock, used to invalidate all waitsets caches when an entity is - deleted */ - std::unordered_set waitsets; - -#if !MULTIDOMAIN - /* set of nodes is used to trigger graph guard conditions when something changes, but that - something also changes when creating the built-in readers when Cdds::lock is already held */ - std::mutex nodes_lock; - std::unordered_set nodes; -#endif - -#if MULTIDOMAIN - Cdds() - : gc_for_empty_waitset(0) - {} -#else - Cdds() - : refcount(0), ppant(0), gc_for_empty_waitset(0) - {} -#endif -}; - -static Cdds gcdds; - static void clean_waitset_caches(); #if REPORT_BLOCKED_REQUESTS static void check_for_blocked_requests(CddsClient & client); @@ -372,20 +402,6 @@ extern "C" rmw_ret_t rmw_set_log_severity(rmw_log_severity_t severity) return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_context_fini(rmw_context_t * context) -{ - RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - context, - context->implementation_identifier, - eclipse_cyclonedds_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - // context impl is explicitly supposed to be nullptr for now, see rmw_init's code - // RCUTILS_CHECK_ARGUMENT_FOR_NULL(context->impl, RMW_RET_INVALID_ARGUMENT); - *context = rmw_get_zero_initialized_context(); - return RMW_RET_OK; -} - extern "C" rmw_ret_t rmw_init_options_init( rmw_init_options_t * init_options, rcutils_allocator_t allocator) @@ -400,12 +416,10 @@ extern "C" rmw_ret_t rmw_init_options_init( init_options->implementation_identifier = eclipse_cyclonedds_identifier; init_options->allocator = allocator; init_options->impl = nullptr; -#if RMW_VERSION_GTE(0, 8, 2) init_options->localhost_only = RMW_LOCALHOST_ONLY_DEFAULT; init_options->domain_id = RMW_DEFAULT_DOMAIN_ID; init_options->enclave = NULL; init_options->security_options = rmw_get_zero_initialized_security_options(); -#endif return RMW_RET_OK; } @@ -422,7 +436,7 @@ extern "C" rmw_ret_t rmw_init_options_copy(const rmw_init_options_t * src, rmw_i RMW_SET_ERROR_MSG("expected zero-initialized dst"); return RMW_RET_INVALID_ARGUMENT; } -#if RMW_VERSION_GTE(0, 8, 2) + const rcutils_allocator_t * allocator = &src->allocator; rmw_ret_t ret = RMW_RET_OK; @@ -440,10 +454,6 @@ extern "C" rmw_ret_t rmw_init_options_copy(const rmw_init_options_t * src, rmw_i fail: allocator->deallocate(dst->enclave, allocator->state); return ret; -#else - *dst = *src; - return RMW_RET_OK; -#endif } extern "C" rmw_ret_t rmw_init_options_fini(rmw_init_options_t * init_options) @@ -456,207 +466,234 @@ extern "C" rmw_ret_t rmw_init_options_fini(rmw_init_options_t * init_options) init_options->implementation_identifier, eclipse_cyclonedds_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); -#if RMW_VERSION_GTE(0, 8, 2) allocator.deallocate(init_options->enclave, allocator.state); rmw_security_options_fini(&init_options->security_options, &allocator); -#endif *init_options = rmw_get_zero_initialized_init_options(); return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t * context) +static void convert_guid_to_gid(const dds_guid_t & guid, rmw_gid_t & gid) { - static_cast(options); - static_cast(context); - RCUTILS_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT); - RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - options, - options->implementation_identifier, - eclipse_cyclonedds_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - context->instance_id = options->instance_id; - context->implementation_identifier = eclipse_cyclonedds_identifier; - context->impl = nullptr; -#if RMW_VERSION_GTE(0, 8, 2) - return rmw_init_options_copy(options, &context->options); -#else - return RMW_RET_OK; -#endif + static_assert( + RMW_GID_STORAGE_SIZE >= sizeof(guid), + "rmw_gid_t type too small for a Cyclone DDS GUID"); + memset(&gid, 0, sizeof(gid)); + gid.implementation_identifier = eclipse_cyclonedds_identifier; + memcpy(gid.data, guid.v, sizeof(guid)); } -extern "C" rmw_ret_t rmw_node_assert_liveliness(const rmw_node_t * node) +static void get_entity_gid(dds_entity_t h, rmw_gid_t & gid) { - RET_WRONG_IMPLID(node); - return RMW_RET_OK; + dds_guid_t guid; + dds_get_guid(h, &guid); + convert_guid_to_gid(guid, gid); } -extern "C" rmw_ret_t rmw_shutdown(rmw_context_t * context) +static void handle_ParticipantEntitiesInfo(dds_entity_t reader, void * arg) { - RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - context, - context->implementation_identifier, - eclipse_cyclonedds_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - // context impl is explicitly supposed to be nullptr for now, see rmw_init's code - // RCUTILS_CHECK_ARGUMENT_FOR_NULL (context->impl, RMW_RET_INVALID_ARGUMENT); - *context = rmw_get_zero_initialized_context(); - return RMW_RET_OK; + static_cast(reader); + rmw_context_impl_t * impl = static_cast(arg); + ParticipantEntitiesInfo msg; + bool taken; + while (rmw_take(impl->common.sub, &msg, &taken, nullptr) == RMW_RET_OK && taken) { + // locally published data is filtered because of the subscription QoS + impl->common.graph_cache.update_participant_entities(msg); + } } -///////////////////////////////////////////////////////////////////////////////////////// -/////////// /////////// -/////////// GRAPH GUARD /////////// -/////////// /////////// -///////////////////////////////////////////////////////////////////////////////////////// - -static void ggcallback(dds_entity_t rd, void * varg) +static void handle_DCPSParticipant(dds_entity_t reader, void * arg) { -#if MULTIDOMAIN - auto gg = static_cast(varg); - void * msg = 0; - dds_sample_info_t info; - while (dds_take(rd, &msg, &info, 1, 1) > 0) { - dds_return_loan(rd, &msg, 1); - } - if (rmw_trigger_guard_condition(gg) != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to trigger graph guard condition"); - } -#else - static_cast(varg); - void * msg = 0; - dds_sample_info_t info; - while (dds_take(rd, &msg, &info, 1, 1) > 0) { - dds_return_loan(rd, &msg, 1); - } - - { - std::lock_guard lock(gcdds.nodes_lock); - for (auto && node_impl : gcdds.nodes) { - if (rmw_trigger_guard_condition(node_impl->graph_guard_condition) != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to trigger graph guard condition"); + rmw_context_impl_t * impl = static_cast(arg); + dds_sample_info_t si; + void * raw = NULL; + while (dds_take(reader, &raw, &si, 1, 1) == 1) { + auto s = static_cast(raw); + rmw_gid_t gid; + convert_guid_to_gid(s->key, gid); + if (memcmp(&gid, &impl->common.gid, sizeof(gid)) == 0) { + // ignore the local participant + } else if (si.instance_state != DDS_ALIVE_INSTANCE_STATE) { + impl->common.graph_cache.remove_participant(gid); + } else if (si.valid_data) { + void * ud; + size_t udsz; + if (dds_qget_userdata(s->qos, &ud, &udsz)) { + std::vector udvec(static_cast(ud), static_cast(ud) + udsz); + dds_free(ud); + auto map = rmw::impl::cpp::parse_key_value(udvec); + auto name_found = map.find("enclave"); + if (name_found != map.end()) { + auto enclave = + std::string(name_found->second.begin(), name_found->second.end()); + impl->common.graph_cache.add_participant(gid, enclave); + } } } + dds_return_loan(reader, &raw, 1); + } +} + +static void handle_builtintopic_endpoint( + dds_entity_t reader, rmw_context_impl_t * impl, + bool is_reader) +{ + dds_sample_info_t si; + void * raw = NULL; + while (dds_take(reader, &raw, &si, 1, 1) == 1) { + auto s = static_cast(raw); + rmw_gid_t gid; + convert_guid_to_gid(s->key, gid); + if (si.instance_state != DDS_ALIVE_INSTANCE_STATE) { + impl->common.graph_cache.remove_entity(gid, is_reader); + } else if (si.valid_data && strncmp(s->topic_name, "DCPS", 4) != 0) { + rmw_qos_profile_t qos_profile = rmw_qos_profile_unknown; + rmw_gid_t ppgid; + dds_qos_to_rmw_qos(s->qos, &qos_profile); + convert_guid_to_gid(s->participant_key, ppgid); + impl->common.graph_cache.add_entity( + gid, + std::string(s->topic_name), + std::string(s->type_name), + ppgid, + qos_profile, + is_reader); + } + dds_return_loan(reader, &raw, 1); } -#endif } -static void builtin_readers_fini(builtin_readers & brd) +static void handle_DCPSSubscription(dds_entity_t reader, void * arg) { - for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) { - if (brd.rds[i] > 0) { - dds_delete(brd.rds[i]); - } - } + rmw_context_impl_t * impl = static_cast(arg); + handle_builtintopic_endpoint(reader, impl, true); } -static bool builtin_readers_init(builtin_readers & brd, dds_entity_t pp, rmw_guard_condition_t * gg) +static void handle_DCPSPublication(dds_entity_t reader, void * arg) { - /* Built-in topics readers: have to be per-node or the graph guard condition support becomes a - real mess. */ -#if MULTIDOMAIN - assert(gg != nullptr); -#else - assert(gg == nullptr); -#endif - dds_listener_t * gglistener = dds_create_listener(static_cast(gg)); - dds_lset_data_available(gglistener, ggcallback); - for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) { - brd.rds[i] = 0; - } - for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) { -#if MULTIDOMAIN - struct dds_rhc * rhc = graphrhc_new(); - dds_entity_t rd = dds_create_reader_rhc(pp, builtin_topics[i], nullptr, gglistener, rhc); -#else - dds_entity_t rd = dds_create_reader(pp, builtin_topics[i], nullptr, gglistener); -#endif - if (rd < 0) { -#if MULTIDOMAIN - dds_rhc_free(rhc); -#endif - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS built-in reader"); - goto fail; - } - brd.rds[i] = rd; - } - dds_delete_listener(gglistener); - return true; - -fail: - builtin_readers_fini(brd); - dds_delete_listener(gglistener); - return false; + rmw_context_impl_t * impl = static_cast(arg); + handle_builtintopic_endpoint(reader, impl, false); } -#if !MULTIDOMAIN -static dds_entity_t ref_ppant() +static void discovery_thread(rmw_context_impl_t * impl) { - std::lock_guard lock(gcdds.lock); - if (gcdds.refcount == 0) { - if ((gcdds.ppant = dds_create_participant(DDS_DOMAIN_DEFAULT, nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create participant"); - return gcdds.ppant; - } - - if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(gcdds.ppant)) < 0) { - RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets"); - dds_delete(gcdds.ppant); - gcdds.ppant = 0; - return DDS_RETCODE_ERROR; + auto sub = static_cast(impl->common.sub->data); + auto gc = static_cast(impl->common.listener_thread_gc->data); + dds_entity_t ws; + /* deleting ppant will delete waitset as well, so there is no real need to delete + the waitset here on error, but it is more hygienic */ + if ((ws = dds_create_waitset(impl->ppant)) < 0) { + RCUTILS_SAFE_FWRITE_TO_STDERR( + "ros discovery info listener thread: failed to create waitset, will shutdown ...\n"); + return; + } + /* I suppose I could attach lambda functions one way or another, which would + definitely be more elegant, but this avoids having to deal with the C++ + freakishness that is involved and works, too. */ + std::vector>> entries = { + {gc->gcondh, nullptr}, + {sub->enth, handle_ParticipantEntitiesInfo}, + {impl->rd_participant, handle_DCPSParticipant}, + {impl->rd_subscription, handle_DCPSSubscription}, + {impl->rd_publication, handle_DCPSPublication}, + }; + for (size_t i = 0; i < entries.size(); i++) { + if (entries[i].second != nullptr && + dds_set_status_mask(entries[i].first, DDS_DATA_AVAILABLE_STATUS) < 0) + { + RCUTILS_SAFE_FWRITE_TO_STDERR( + "ros discovery info listener thread: failed to set reader status masks, " + "will shutdown ...\n"); + return; } - - if (!builtin_readers_init(gcdds.brd, gcdds.ppant, nullptr)) { - dds_delete(gcdds.ppant); - gcdds.ppant = 0; - return DDS_RETCODE_ERROR; + if (dds_waitset_attach(ws, entries[i].first, static_cast(i)) < 0) { + RCUTILS_SAFE_FWRITE_TO_STDERR( + "ros discovery info listener thread: failed to attach entities to waitset, " + "will shutdown ...\n"); + dds_delete(ws); + return; } } - gcdds.refcount++; - return gcdds.ppant; -} + std::vector xs(5); + while (impl->common.thread_is_running.load()) { + dds_return_t n; + if ((n = dds_waitset_wait(ws, xs.data(), xs.size(), DDS_INFINITY)) < 0) { + RCUTILS_SAFE_FWRITE_TO_STDERR( + "ros discovery info listener thread: wait failed, will shutdown ...\n"); + return; + } + for (int32_t i = 0; i < n; i++) { + if (entries[xs[i]].second) { + entries[xs[i]].second(entries[xs[i]].first, impl); + } + } + } + dds_delete(ws); +} -static void unref_ppant() +static rmw_ret_t discovery_thread_start(rmw_context_impl_t * impl) { - std::lock_guard lock(gcdds.lock); - if (--gcdds.refcount == 0) { - builtin_readers_fini(gcdds.brd); - dds_delete(gcdds.ppant); - gcdds.ppant = 0; + auto common_context = &impl->common; + common_context->thread_is_running.store(true); + common_context->listener_thread_gc = create_guard_condition(impl); + if (common_context->listener_thread_gc) { + try { + common_context->listener_thread = std::thread(discovery_thread, impl); + return RMW_RET_OK; + } catch (const std::exception & exc) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to create std::thread: %s", exc.what()); + } catch (...) { + RMW_SET_ERROR_MSG("Failed to create std::thread"); + } + } else { + RMW_SET_ERROR_MSG("Failed to create guard condition"); + } + common_context->thread_is_running.store(false); + if (common_context->listener_thread_gc) { + if (RMW_RET_OK != destroy_guard_condition(common_context->listener_thread_gc)) { + RCUTILS_SAFE_FWRITE_TO_STDERR( + RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" + RCUTILS_STRINGIFY(__LINE__) ": Failed to destroy guard condition"); + } } + return RMW_RET_ERROR; } -#endif - -///////////////////////////////////////////////////////////////////////////////////////// -/////////// /////////// -/////////// NODES /////////// -/////////// /////////// -///////////////////////////////////////////////////////////////////////////////////////// -#if SUPPORT_LOCALHOST -static void node_gone_from_domain_locked(dds_domainid_t did) +static rmw_ret_t discovery_thread_stop(rmw_dds_common::Context & common_context) { - CddsDomain & dom = gcdds.domains[did]; - assert(dom.n_nodes > 0); - if (--dom.n_nodes == 0) { - if (dom.domain_handle > 0) { - dds_delete(dom.domain_handle); + if (common_context.thread_is_running.exchange(false)) { + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(common_context.listener_thread_gc); + if (RMW_RET_OK != rmw_ret) { + return rmw_ret; + } + try { + common_context.listener_thread.join(); + } catch (const std::exception & exc) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to join std::thread: %s", exc.what()); + return RMW_RET_ERROR; + } catch (...) { + RMW_SET_ERROR_MSG("Failed to join std::thread"); + return RMW_RET_ERROR; + } + rmw_ret = destroy_guard_condition(common_context.listener_thread_gc); + if (RMW_RET_OK != rmw_ret) { + return rmw_ret; } - gcdds.domains.erase(did); } + return RMW_RET_OK; } -static bool check_create_domain_locked(dds_domainid_t did, bool localhost_only) +static bool check_create_domain(dds_domainid_t did, rmw_localhost_only_t localhost_only_option) { + const bool localhost_only = (localhost_only_option == RMW_LOCALHOST_ONLY_ENABLED); + std::lock_guard lock(gcdds.domains_lock); /* return true: n_nodes incremented, localhost_only set correctly, domain exists " false: n_nodes unchanged, domain left intact if it already existed */ CddsDomain & dom = gcdds.domains[did]; - if (dom.n_nodes != 0) { + if (dom.refcount != 0) { /* Localhost setting must match */ if (localhost_only == dom.localhost_only) { - dom.n_nodes++; + dom.refcount++; return true; } else { RCUTILS_LOG_ERROR_NAMED( @@ -666,7 +703,7 @@ static bool check_create_domain_locked(dds_domainid_t did, bool localhost_only) return false; } } else { - dom.n_nodes = 1; + dom.refcount = 1; dom.localhost_only = localhost_only; /* Localhost-only: set network interface address (shortened form of config would be @@ -690,7 +727,7 @@ static bool check_create_domain_locked(dds_domainid_t did, bool localhost_only) "rmw_cyclonedds_cpp", "rmw_create_node: failed to retrieve CYCLONEDDS_URI environment variable, error %s", get_env_error); - node_gone_from_domain_locked(did); + gcdds.domains.erase(did); return false; } @@ -698,39 +735,14 @@ static bool check_create_domain_locked(dds_domainid_t did, bool localhost_only) RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create domain, error %s", dds_strretcode(dom.domain_handle)); - node_gone_from_domain_locked(did); + gcdds.domains.erase(did); return false; } else { return true; } } } -#else -/* Dummy implementation to keep the rest of the code simpler */ -static void node_gone_from_domain_locked(dds_domainid_t did) -{ - static_cast(did); -} -#endif - -static std::string get_node_user_data_name_ns(const char * node_name, const char * node_namespace) -{ - return std::string("name=") + std::string(node_name) + - std::string(";namespace=") + std::string(node_namespace) + - std::string(";"); -} -#if RMW_VERSION_GTE(0, 8, 2) -static std::string get_node_user_data( - const char * node_name, const char * node_namespace, const char * enclave) -{ - return get_node_user_data_name_ns(node_name, node_namespace) + - std::string("enclave=") + std::string(enclave) + - std::string(";"); -} -#else -#define get_node_user_data get_node_user_data_name_ns -#endif #if RMW_SUPPORT_SECURITY /* Returns the full URI of a security file properly formatted for DDS */ bool get_security_file_URI( @@ -754,11 +766,7 @@ bool get_security_file_URI( } bool get_security_file_URIs( -#if !RMW_VERSION_GTE(0, 8, 2) - const rmw_node_security_options_t * security_options, -#else const rmw_security_options_t * security_options, -#endif dds_security_files_t & dds_security_files, rcutils_allocator_t allocator) { bool ret = false; @@ -803,18 +811,12 @@ void finalize_security_file_URIs( allocator.deallocate(dds_security_files.permissions_p7s, allocator.state); dds_security_files.permissions_p7s = nullptr; } - #endif /* RMW_SUPPORT_SECURITY */ /* Attempt to set all the qos properties needed to enable DDS security */ rmw_ret_t configure_qos_for_security( dds_qos_t * qos, -#if !RMW_VERSION_GTE(0, 8, 2) - const rmw_node_security_options_t * security_options -#else - const rmw_security_options_t * security_options -#endif -) + const rmw_security_options_t * security_options) { #if RMW_SUPPORT_SECURITY rmw_ret_t ret = RMW_RET_UNSUPPORTED; @@ -856,148 +858,245 @@ rmw_ret_t configure_qos_for_security( #endif } - -extern "C" rmw_node_t * rmw_create_node( - rmw_context_t * context, const char * name, - const char * namespace_, size_t domain_id -#if !RMW_VERSION_GTE(0, 8, 2) - , const rmw_node_security_options_t * security_options -#endif -#if RMW_VERSION_GTE(0, 8, 1) - , bool localhost_only -#endif -) +extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t * context) { -#if !RMW_VERSION_GTE(0, 8, 2) + rmw_ret_t ret; + + static_cast(options); static_cast(context); -#endif - RET_NULL_X(name, return nullptr); - RET_NULL_X(namespace_, return nullptr); -#if MULTIDOMAIN + RCUTILS_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT); + RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + options, + options->implementation_identifier, + eclipse_cyclonedds_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + /* domain_id = UINT32_MAX = Cyclone DDS' "default domain id".*/ - if (domain_id >= UINT32_MAX) { + if (options->domain_id >= UINT32_MAX) { RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: domain id out of range"); - return nullptr; + return RMW_RET_INVALID_ARGUMENT; } - const dds_domainid_t did = static_cast(domain_id); -#else - static_cast(domain_id); - const dds_domainid_t did = DDS_DOMAIN_DEFAULT; -#endif -#if !RMW_VERSION_GTE(0, 8, 2) - RCUTILS_CHECK_ARGUMENT_FOR_NULL(security_options, nullptr); -#else - rmw_security_options_t * security_options; - RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, nullptr); - security_options = &context->options.security_options; -#endif - rmw_ret_t ret; - int dummy_validation_result; - size_t dummy_invalid_index; - if ((ret = - rmw_validate_node_name(name, &dummy_validation_result, &dummy_invalid_index)) != RMW_RET_OK) - { - return nullptr; + const dds_domainid_t domain_id = static_cast(options->domain_id); + + context->instance_id = options->instance_id; + context->implementation_identifier = eclipse_cyclonedds_identifier; + context->impl = nullptr; + + if ((ret = rmw_init_options_copy(options, &context->options)) != RMW_RET_OK) { + return ret; + } + + std::unique_ptr impl(new(std::nothrow) rmw_context_impl_t()); + if (impl == nullptr) { + return RMW_RET_BAD_ALLOC; } -#if SUPPORT_LOCALHOST /* Take domains_lock and hold it until after the participant creation succeeded or failed: otherwise there is a race with rmw_destroy_node deleting the last participant and tearing down the domain for versions of Cyclone that implement the original version of dds_create_domain that doesn't return a handle. */ - std::lock_guard lock(gcdds.domains_lock); - if (!check_create_domain_locked(did, localhost_only)) { - return nullptr; + if (!check_create_domain(domain_id, options->localhost_only)) { + return RMW_RET_ERROR; } -#endif - dds_qos_t * qos = dds_create_qos(); -#if RMW_VERSION_GTE(0, 8, 2) - std::string user_data = get_node_user_data(name, namespace_, context->options.enclave); -#else - std::string user_data = get_node_user_data(name, namespace_); -#endif - dds_qset_userdata(qos, user_data.c_str(), user_data.size()); + /* Once the domain id is set in impl, impl's destructor will take care of unref'ing + the domain */ + impl->domain_id = domain_id; - if (configure_qos_for_security(qos, security_options) != RMW_RET_OK) { - if (security_options->enforce_security == RMW_SECURITY_ENFORCEMENT_ENFORCE) { - dds_delete_qos(qos); - node_gone_from_domain_locked(did); - return nullptr; + std::unique_ptr> + ppant_qos(dds_create_qos(), &dds_delete_qos); + if (ppant_qos == nullptr) { + return RMW_RET_BAD_ALLOC; + } + std::string user_data = std::string("enclave=") + std::string( + context->options.enclave) + std::string(";"); + dds_qset_userdata(ppant_qos.get(), user_data.c_str(), user_data.size()); + if (configure_qos_for_security( + ppant_qos.get(), + &context->options.security_options) != RMW_RET_OK) + { + if (context->options.security_options.enforce_security == RMW_SECURITY_ENFORCEMENT_ENFORCE) { + return RMW_RET_ERROR; } } - - dds_entity_t pp = dds_create_participant(did, qos, nullptr); - dds_delete_qos(qos); - if (pp < 0) { - node_gone_from_domain_locked(did); + impl->ppant = dds_create_participant(domain_id, ppant_qos.get(), nullptr); + if (impl->ppant < 0) { RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); - return nullptr; + return RMW_RET_ERROR; } -#if SUPPORT_LOCALHOST - if (gcdds.domains[did].domain_handle == 0) { - /* Original implementation of dds_create_domain(): returned value is 0 (DDS_RETCODE_OK) - and a reference to the domain is leaked, necessitating an explicit call to - dds_delete(). For this, we must recover the handle from the participant. Yikes! - Pending fixes to Cyclone (https://github.com/eclipse-cyclonedds/cyclonedds/pull/304): - a handle is returned by dds_create_domain() and dds_delete() must be called (so - entirely in line with other entity types). In this case, we don't have to do this - (though there would be no harm in it either). - - Note that "domains_lock" is held all this time. */ - gcdds.domains[did].domain_handle = dds_get_parent(pp); + /* Create readers for DDS built-in topics for monitoring discovery */ + if ((impl->rd_participant = + dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, nullptr, nullptr)) < 0) + { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSParticipant reader"); + return RMW_RET_ERROR; + } + if ((impl->rd_subscription = + dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, nullptr, nullptr)) < 0) + { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSSubscription reader"); + return RMW_RET_ERROR; + } + if ((impl->rd_publication = + dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, nullptr, nullptr)) < 0) + { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSPublication reader"); + return RMW_RET_ERROR; } -#endif - /* Since ROS2 doesn't require anything fancy from DDS Subscribers or Publishers, create a single - pair & reuse that */ - dds_entity_t pub, sub; - if ((pub = dds_create_publisher(pp, nullptr, nullptr)) < 0) { - node_gone_from_domain_locked(did); + /* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers + to be created for RMW publishers/subscriptions. */ + if ((impl->dds_pub = dds_create_publisher(impl->ppant, nullptr, nullptr)) < 0) { RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher"); - dds_delete(pp); - return nullptr; + return RMW_RET_ERROR; } - if ((sub = dds_create_subscriber(pp, nullptr, nullptr)) < 0) { - node_gone_from_domain_locked(did); + if ((impl->dds_sub = dds_create_subscriber(impl->ppant, nullptr, nullptr)) < 0) { RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber"); - dds_delete(pp); - return nullptr; + return RMW_RET_ERROR; } - auto * node_impl = new CddsNode(); - rmw_node_t * node_handle = nullptr; - RET_ALLOC_X(node_impl, goto fail_node_impl); - rmw_guard_condition_t * graph_guard_condition; - if (!(graph_guard_condition = rmw_create_guard_condition(context))) { - goto fail_ggc; + rmw_qos_profile_t pubsub_qos = rmw_qos_profile_default; + pubsub_qos.avoid_ros_namespace_conventions = true; + pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + pubsub_qos.depth = 1; + pubsub_qos.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + pubsub_qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + + /* Create RMW publisher/subscription/guard condition used by rmw_dds_common + discovery */ + rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options(); + impl->common.pub = create_publisher( + impl->ppant, impl->dds_pub, + rosidl_typesupport_cpp::get_message_type_support_handle(), + "ros_discovery_info", + &pubsub_qos, + &publisher_options); + if (impl->common.pub == nullptr) { + return RMW_RET_ERROR; } - node_impl->enth = pp; - node_impl->pub = pub; - node_impl->sub = sub; - node_impl->graph_guard_condition = graph_guard_condition; - node_impl->domain_id = did; -#if MULTIDOMAIN - if (!builtin_readers_init(node_impl->brd, pp, graph_guard_condition)) { - goto fail_builtin_reader; + rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options(); + subscription_options.ignore_local_publications = true; + // FIXME: keyed topics => KEEP_LAST and depth 1. + pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL; + impl->common.sub = create_subscription( + impl->ppant, impl->dds_sub, + rosidl_typesupport_cpp::get_message_type_support_handle(), + "ros_discovery_info", + &pubsub_qos, + &subscription_options); + if (impl->common.sub == nullptr) { + return RMW_RET_ERROR; } -#else + + impl->common.graph_guard_condition = create_guard_condition(impl.get()); + if (impl->common.graph_guard_condition == nullptr) { + return RMW_RET_BAD_ALLOC; + } + + impl->common.graph_cache.set_on_change_callback( + [guard_condition = impl->common.graph_guard_condition]() { + rmw_ret_t ret = rmw_trigger_guard_condition(guard_condition); + if (ret != RMW_RET_OK) { + RMW_SET_ERROR_MSG("graph cache on_change_callback failed to trigger guard condition"); + } + }); + + get_entity_gid(impl->ppant, impl->common.gid); + impl->common.graph_cache.add_participant(impl->common.gid, context->options.enclave); + + // One could also use a set of listeners instead of a thread for maintaining the graph cache: + // - Locally published samples shouldn't make it to the reader, so there shouldn't be a deadlock + // caused by the graph cache's mutex already having been locked by (e.g.) rmw_create_node. + // - Whatever the graph cache implementation does, it shouldn't involve much more than local state + // updates and triggering a guard condition, and so that should be safe. + // however, the graph cache updates could be expensive, and so performing those operations on + // the thread receiving data from the network may not be wise. + if ((ret = discovery_thread_start(impl.get())) != RMW_RET_OK) { + return ret; + } + + context->impl = impl.release(); + return RMW_RET_OK; +} + +extern "C" rmw_ret_t rmw_shutdown(rmw_context_t * context) +{ + RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + context, + context->implementation_identifier, + eclipse_cyclonedds_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + // Nothing to do here for now. + // This is just the middleware's notification that shutdown was called. + return RMW_RET_OK; +} + +extern "C" rmw_ret_t rmw_context_fini(rmw_context_t * context) +{ + RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + context, + context->implementation_identifier, + eclipse_cyclonedds_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + delete context->impl; + *context = rmw_get_zero_initialized_context(); + return RMW_RET_OK; +} + +///////////////////////////////////////////////////////////////////////////////////////// +/////////// /////////// +/////////// NODES /////////// +/////////// /////////// +///////////////////////////////////////////////////////////////////////////////////////// + +extern "C" rmw_ret_t rmw_node_assert_liveliness(const rmw_node_t * node) +{ + RET_WRONG_IMPLID(node); + if (dds_assert_liveliness(node->context->impl->ppant) != DDS_RETCODE_OK) { + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + +extern "C" rmw_node_t * rmw_create_node( + rmw_context_t * context, const char * name, + const char * namespace_, size_t domain_id, + bool localhost_only) +{ + static_cast(domain_id); + static_cast(localhost_only); + RET_NULL_X(name, return nullptr); + RET_NULL_X(namespace_, return nullptr); + rmw_ret_t ret; + int dummy_validation_result; + size_t dummy_invalid_index; + if ((ret = + rmw_validate_node_name(name, &dummy_validation_result, &dummy_invalid_index)) != RMW_RET_OK) { - std::lock_guard lock(gcdds.nodes_lock); - gcdds.nodes.insert(node_impl); + return nullptr; } -#endif + + auto * node_impl = new CddsNode(); + rmw_node_t * node_handle = nullptr; + RET_ALLOC_X(node_impl, goto fail_node_impl); node_handle = rmw_node_allocate(); RET_ALLOC_X(node_handle, goto fail_node_handle); node_handle->implementation_identifier = eclipse_cyclonedds_identifier; node_handle->data = node_impl; + node_handle->context = context; node_handle->name = static_cast(rmw_allocate(sizeof(char) * strlen(name) + 1)); RET_ALLOC_X(node_handle->name, goto fail_node_handle_name); @@ -1007,31 +1106,39 @@ extern "C" rmw_node_t * rmw_create_node( static_cast(rmw_allocate(sizeof(char) * strlen(namespace_) + 1)); RET_ALLOC_X(node_handle->namespace_, goto fail_node_handle_namespace); memcpy(const_cast(node_handle->namespace_), namespace_, strlen(namespace_) + 1); + + { + // Though graph_cache methods are thread safe, both cache update and publishing have to also + // be atomic. + // If not, the following race condition is possible: + // node1-update-get-message / node2-update-get-message / node2-publish / node1-publish + // In that case, the last message published is not accurate. + auto common = &context->impl->common; + std::lock_guard guard(common->node_update_mutex); + rmw_dds_common::msg::ParticipantEntitiesInfo participant_msg = + common->graph_cache.add_node(common->gid, name, namespace_); + if (RMW_RET_OK != rmw_publish( + common->pub, + static_cast(&participant_msg), + nullptr)) + { + // If publishing the message failed, we don't have to publish an update + // after removing it from the graph cache */ + static_cast(common->graph_cache.remove_node(common->gid, name, namespace_)); + goto fail_pub_info; + } + } return node_handle; +fail_pub_info: + rmw_free(const_cast(node_handle->namespace_)); fail_node_handle_namespace: rmw_free(const_cast(node_handle->name)); fail_node_handle_name: rmw_node_free(node_handle); fail_node_handle: -#if !MULTIDOMAIN - { - std::lock_guard lock(gcdds.nodes_lock); - gcdds.nodes.erase(node_impl); - } -#else - builtin_readers_fini(node_impl->brd); -fail_builtin_reader: -#endif - if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); - } -fail_ggc: delete node_impl; fail_node_impl: - dds_delete(pp); - node_gone_from_domain_locked(did); return nullptr; } @@ -1041,33 +1148,24 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node) RET_WRONG_IMPLID(node); auto node_impl = static_cast(node->data); RET_NULL(node_impl); - rmw_free(const_cast(node->name)); - rmw_free(const_cast(node->namespace_)); - rmw_node_free(node); -#if MULTIDOMAIN - /* Must prevent the built-in topic listener from triggering before deleting the graph guard - condition. Deleting them first is the easiest. */ - builtin_readers_fini(node_impl->brd); -#else { - std::lock_guard lock(gcdds.nodes_lock); - gcdds.nodes.erase(node_impl); - } -#endif - if (RMW_RET_OK != rmw_destroy_guard_condition(node_impl->graph_guard_condition)) { - RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); - result_ret = RMW_RET_ERROR; + // Though graph_cache methods are thread safe, both cache update and publishing have to also + // be atomic. + // If not, the following race condition is possible: + // node1-update-get-message / node2-update-get-message / node2-publish / node1-publish + // In that case, the last message published is not accurate. + auto common = &node->context->impl->common; + std::lock_guard guard(common->node_update_mutex); + rmw_dds_common::msg::ParticipantEntitiesInfo participant_msg = + common->graph_cache.add_node(common->gid, node->name, node->namespace_); + result_ret = rmw_publish( + common->pub, static_cast(&participant_msg), nullptr); } -#if SUPPORT_LOCALHOST - /* prevent race with rmw_create_node (see there) */ - std::lock_guard lock(gcdds.domains_lock); -#endif - if (dds_delete(node_impl->enth) < 0) { - RMW_SET_ERROR_MSG("failed to destroy DDS participant"); - result_ret = RMW_RET_ERROR; - } - node_gone_from_domain_locked(node_impl->domain_id); + + rmw_free(const_cast(node->name)); + rmw_free(const_cast(node->namespace_)); + rmw_node_free(node); delete node_impl; return result_ret; } @@ -1077,7 +1175,7 @@ extern "C" const rmw_guard_condition_t * rmw_node_get_graph_guard_condition(cons RET_WRONG_IMPLID_X(node, return nullptr); auto node_impl = static_cast(node->data); RET_NULL_X(node_impl, return nullptr); - return node_impl->graph_guard_condition; + return node->context->impl->common.graph_guard_condition; } ///////////////////////////////////////////////////////////////////////////////////////// @@ -1197,11 +1295,7 @@ static dds_entity_t create_topic( struct ddsi_sertopic ** stact) { dds_entity_t tp; -#ifdef DDS_HAS_CREATE_TOPIC_GENERIC tp = dds_create_topic_generic(pp, &sertopic, nullptr, nullptr, nullptr); -#else - tp = dds_create_topic_arbitrary(pp, sertopic, nullptr, nullptr, nullptr); -#endif if (tp < 0) { ddsi_sertopic_unref(sertopic); } else { @@ -1215,11 +1309,6 @@ static dds_entity_t create_topic( static dds_entity_t create_topic(dds_entity_t pp, struct ddsi_sertopic * sertopic) { dds_entity_t tp = create_topic(pp, sertopic, nullptr); -#ifndef DDS_HAS_CREATE_TOPIC_GENERIC - if (tp > 0) { - ddsi_sertopic_unref(sertopic); - } -#endif return tp; } @@ -1409,7 +1498,6 @@ static dds_qos_t * create_readwrite_qos( return qos; } -#if RMW_VERSION_GTE(0, 8, 2) static rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy(dds_qos_policy_id_t policy_id) { switch (policy_id) { @@ -1429,7 +1517,6 @@ static rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy(dds_qos_policy_id_ return RMW_QOS_POLICY_INVALID; } } -#endif static bool dds_qos_to_rmw_qos(const dds_qos_t * dds_qos, rmw_qos_profile_t * qos_policies) { @@ -1557,16 +1644,13 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * rmw_qos_p } static CddsPublisher * create_cdds_publisher( - const rmw_node_t * node, + dds_entity_t dds_ppant, dds_entity_t dds_pub, const rosidl_message_type_support_t * type_supports, const char * topic_name, const rmw_qos_profile_t * qos_policies) { - RET_WRONG_IMPLID_X(node, return nullptr); RET_NULL_OR_EMPTYSTR_X(topic_name, return nullptr); RET_NULL_X(qos_policies, return nullptr); - auto node_impl = static_cast(node->data); - RET_NULL_X(node_impl, return nullptr); const rosidl_message_type_support_t * type_support = get_typesupport(type_supports); RET_NULL_X(type_support, return nullptr); CddsPublisher * pub = new CddsPublisher(); @@ -1580,7 +1664,7 @@ static CddsPublisher * create_cdds_publisher( create_message_type_support(type_support->data, type_support->typesupport_identifier), false, rmw_cyclonedds_cpp::make_message_value_type(type_supports)); struct ddsi_sertopic * stact; - topic = create_topic(node_impl->enth, sertopic, &stact); + topic = create_topic(dds_ppant, sertopic, &stact); if (topic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); goto fail_topic; @@ -1588,7 +1672,7 @@ static CddsPublisher * create_cdds_publisher( if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { goto fail_qos; } - if ((pub->enth = dds_create_writer(node_impl->pub, topic, qos, nullptr)) < 0) { + if ((pub->enth = dds_create_writer(dds_pub, topic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } @@ -1596,6 +1680,7 @@ static CddsPublisher * create_cdds_publisher( RMW_SET_ERROR_MSG("failed to get instance handle for writer"); goto fail_instance_handle; } + get_entity_gid(pub->enth, pub->gid); pub->sertopic = stact; dds_delete_qos(qos); dds_delete(topic); @@ -1609,9 +1694,7 @@ static CddsPublisher * create_cdds_publisher( dds_delete_qos(qos); fail_qos: dds_delete(topic); -#ifndef DDS_HAS_CREATE_TOPIC_GENERIC ddsi_sertopic_unref(stact); -#endif fail_topic: delete pub; return nullptr; @@ -1635,20 +1718,20 @@ extern "C" rmw_ret_t rmw_fini_publisher_allocation(rmw_publisher_allocation_t * return RMW_RET_ERROR; } -extern "C" rmw_publisher_t * rmw_create_publisher( - const rmw_node_t * node, const rosidl_message_type_support_t * type_supports, - const char * topic_name, const rmw_qos_profile_t * qos_policies -#if RMW_VERSION_GTE(0, 8, 1) - , const rmw_publisher_options_t * publisher_options -#endif +static rmw_publisher_t * create_publisher( + dds_entity_t dds_ppant, dds_entity_t dds_pub, + const rosidl_message_type_support_t * type_supports, + const char * topic_name, const rmw_qos_profile_t * qos_policies, + const rmw_publisher_options_t * publisher_options ) { CddsPublisher * pub; rmw_publisher_t * rmw_publisher; -#if RMW_VERSION_GTE(0, 8, 1) - RET_NULL_X(publisher_options, return nullptr); -#endif - if ((pub = create_cdds_publisher(node, type_supports, topic_name, qos_policies)) == nullptr) { + if ((pub = + create_cdds_publisher( + dds_ppant, dds_pub, type_supports, topic_name, + qos_policies)) == nullptr) + { goto fail_common_init; } rmw_publisher = rmw_publisher_allocate(); @@ -1658,10 +1741,8 @@ extern "C" rmw_publisher_t * rmw_create_publisher( rmw_publisher->topic_name = reinterpret_cast(rmw_allocate(strlen(topic_name) + 1)); RET_ALLOC_X(rmw_publisher->topic_name, goto fail_topic_name); memcpy(const_cast(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1); -#if RMW_VERSION_GTE(0, 8, 1) rmw_publisher->options = *publisher_options; rmw_publisher->can_loan_messages = false; -#endif return rmw_publisher; fail_topic_name: rmw_publisher_free(rmw_publisher); @@ -1674,6 +1755,38 @@ extern "C" rmw_publisher_t * rmw_create_publisher( return nullptr; } +extern "C" rmw_publisher_t * rmw_create_publisher( + const rmw_node_t * node, const rosidl_message_type_support_t * type_supports, + const char * topic_name, const rmw_qos_profile_t * qos_policies, + const rmw_publisher_options_t * publisher_options +) +{ + RET_WRONG_IMPLID_X(node, return nullptr); + rmw_publisher_t * pub = create_publisher( + node->context->impl->ppant, node->context->impl->dds_pub, + type_supports, topic_name, qos_policies, + publisher_options); + if (pub != nullptr) { + // Update graph + auto common = &node->context->impl->common; + const auto cddspub = static_cast(pub->data); + std::lock_guard guard(common->node_update_mutex); + rmw_dds_common::msg::ParticipantEntitiesInfo msg = + common->graph_cache.associate_writer(cddspub->gid, common->gid, node->name, node->namespace_); + if (RMW_RET_OK != rmw_publish( + common->pub, + static_cast(&msg), + nullptr)) + { + static_cast(common->graph_cache.dissociate_writer( + cddspub->gid, common->gid, node->name, node->namespace_)); + static_cast(destroy_publisher(pub)); + return nullptr; + } + } + return pub; +} + extern "C" rmw_ret_t rmw_get_gid_for_publisher(const rmw_publisher_t * publisher, rmw_gid_t * gid) { RET_WRONG_IMPLID(publisher); @@ -1718,6 +1831,10 @@ extern "C" rmw_ret_t rmw_publisher_count_matched_subscriptions( rmw_ret_t rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) { RET_WRONG_IMPLID(publisher); + auto pub = static_cast(publisher->data); + if (dds_assert_liveliness(pub->enth) < 0) { + return RMW_RET_ERROR; + } return RMW_RET_OK; } @@ -1741,7 +1858,6 @@ extern "C" rmw_ret_t rmw_borrow_loaned_message( (void) publisher; (void) type_support; (void) ros_message; - RMW_SET_ERROR_MSG("rmw_borrow_loaned_message not implemented for rmw_cyclonedds_cpp"); return RMW_RET_UNSUPPORTED; } @@ -1752,24 +1868,19 @@ extern "C" rmw_ret_t rmw_return_loaned_message_from_publisher( { (void) publisher; (void) loaned_message; - RMW_SET_ERROR_MSG( "rmw_return_loaned_message_from_publisher not implemented for rmw_cyclonedds_cpp"); return RMW_RET_UNSUPPORTED; } -extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) +static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher) { - RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(publisher); auto pub = static_cast(publisher->data); if (pub != nullptr) { if (dds_delete(pub->enth) < 0) { RMW_SET_ERROR_MSG("failed to delete writer"); } -#ifndef DDS_HAS_CREATE_TOPIC_GENERIC - ddsi_sertopic_unref(pub->sertopic); -#endif delete pub; } rmw_free(const_cast(publisher->topic_name)); @@ -1778,22 +1889,39 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * return RMW_RET_OK; } -///////////////////////////////////////////////////////////////////////////////////////// -/////////// /////////// -/////////// SUBSCRIPTIONS /////////// -/////////// /////////// -///////////////////////////////////////////////////////////////////////////////////////// - +extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) +{ + RET_WRONG_IMPLID(node); + { + auto common = &node->context->impl->common; + const auto cddspub = static_cast(publisher->data); + std::lock_guard guard(common->node_update_mutex); + rmw_dds_common::msg::ParticipantEntitiesInfo msg = + common->graph_cache.dissociate_writer( + cddspub->gid, common->gid, node->name, + node->namespace_); + if (RMW_RET_OK != rmw_publish(common->pub, static_cast(&msg), nullptr)) { + RMW_SET_ERROR_MSG( + "failed to publish ParticipantEntitiesInfo message after dissociating writer"); + } + } + return destroy_publisher(publisher); +} + + +///////////////////////////////////////////////////////////////////////////////////////// +/////////// /////////// +/////////// SUBSCRIPTIONS /////////// +/////////// /////////// +///////////////////////////////////////////////////////////////////////////////////////// + static CddsSubscription * create_cdds_subscription( - const rmw_node_t * node, + dds_entity_t dds_ppant, dds_entity_t dds_sub, const rosidl_message_type_support_t * type_supports, const char * topic_name, const rmw_qos_profile_t * qos_policies, bool ignore_local_publications) { - RET_WRONG_IMPLID_X(node, return nullptr); RET_NULL_OR_EMPTYSTR_X(topic_name, return nullptr); RET_NULL_X(qos_policies, return nullptr); - auto node_impl = static_cast(node->data); - RET_NULL_X(node_impl, return nullptr); const rosidl_message_type_support_t * type_support = get_typesupport(type_supports); RET_NULL_X(type_support, return nullptr); CddsSubscription * sub = new CddsSubscription(); @@ -1806,7 +1934,7 @@ static CddsSubscription * create_cdds_subscription( fqtopic_name.c_str(), type_support->typesupport_identifier, create_message_type_support(type_support->data, type_support->typesupport_identifier), false, rmw_cyclonedds_cpp::make_message_value_type(type_supports)); - topic = create_topic(node_impl->enth, sertopic); + topic = create_topic(dds_ppant, sertopic); if (topic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); goto fail_topic; @@ -1814,10 +1942,11 @@ static CddsSubscription * create_cdds_subscription( if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) { goto fail_qos; } - if ((sub->enth = dds_create_reader(node_impl->sub, topic, qos, nullptr)) < 0) { + if ((sub->enth = dds_create_reader(dds_sub, topic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } + get_entity_gid(sub->enth, sub->gid); if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) { RMW_SET_ERROR_MSG("failed to create readcondition"); goto fail_readcond; @@ -1856,30 +1985,18 @@ extern "C" rmw_ret_t rmw_fini_subscription_allocation(rmw_subscription_allocatio return RMW_RET_ERROR; } -extern "C" rmw_subscription_t * rmw_create_subscription( - const rmw_node_t * node, const rosidl_message_type_support_t * type_supports, - const char * topic_name, const rmw_qos_profile_t * qos_policies -#if RMW_VERSION_GTE(0, 8, 1) - , const rmw_subscription_options_t * subscription_options -#else - , bool ignore_local_publications -#endif -) +static rmw_subscription_t * create_subscription( + dds_entity_t dds_ppant, dds_entity_t dds_sub, + const rosidl_message_type_support_t * type_supports, + const char * topic_name, const rmw_qos_profile_t * qos_policies, + const rmw_subscription_options_t * subscription_options) { CddsSubscription * sub; rmw_subscription_t * rmw_subscription; -#if RMW_VERSION_GTE(0, 8, 1) - RET_NULL_X(subscription_options, return nullptr); -#endif if ( (sub = create_cdds_subscription( - node, type_supports, topic_name, qos_policies, -#if RMW_VERSION_GTE(0, 8, 1) - subscription_options->ignore_local_publications -#else - ignore_local_publications -#endif - )) == nullptr) + dds_ppant, dds_sub, type_supports, topic_name, qos_policies, + subscription_options->ignore_local_publications)) == nullptr) { goto fail_common_init; } @@ -1891,10 +2008,8 @@ extern "C" rmw_subscription_t * rmw_create_subscription( reinterpret_cast(rmw_allocate(strlen(topic_name) + 1)); RET_ALLOC_X(rmw_subscription->topic_name, goto fail_topic_name); memcpy(const_cast(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1); -#if RMW_VERSION_GTE(0, 8, 1) rmw_subscription->options = *subscription_options; rmw_subscription->can_loan_messages = false; -#endif return rmw_subscription; fail_topic_name: rmw_subscription_free(rmw_subscription); @@ -1911,6 +2026,37 @@ extern "C" rmw_subscription_t * rmw_create_subscription( return nullptr; } +extern "C" rmw_subscription_t * rmw_create_subscription( + const rmw_node_t * node, const rosidl_message_type_support_t * type_supports, + const char * topic_name, const rmw_qos_profile_t * qos_policies, + const rmw_subscription_options_t * subscription_options) +{ + RET_WRONG_IMPLID_X(node, return nullptr); + rmw_subscription_t * sub = create_subscription( + node->context->impl->ppant, node->context->impl->dds_sub, + type_supports, topic_name, qos_policies, + subscription_options); + if (sub != nullptr) { + // Update graph + auto common = &node->context->impl->common; + const auto cddssub = static_cast(sub->data); + std::lock_guard guard(common->node_update_mutex); + rmw_dds_common::msg::ParticipantEntitiesInfo msg = + common->graph_cache.associate_reader(cddssub->gid, common->gid, node->name, node->namespace_); + if (RMW_RET_OK != rmw_publish( + common->pub, + static_cast(&msg), + nullptr)) + { + static_cast(common->graph_cache.dissociate_reader( + cddssub->gid, common->gid, node->name, node->namespace_)); + static_cast(destroy_subscription(sub)); + return nullptr; + } + } + return sub; +} + extern "C" rmw_ret_t rmw_subscription_count_matched_publishers( const rmw_subscription_t * subscription, size_t * publisher_count) { @@ -1939,9 +2085,8 @@ extern "C" rmw_ret_t rmw_subscription_get_actual_qos( } } -extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) +static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription) { - RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(subscription); auto sub = static_cast(subscription->data); if (sub != nullptr) { @@ -1960,6 +2105,25 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscriptio return RMW_RET_OK; } +extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) +{ + RET_WRONG_IMPLID(node); + { + auto common = &node->context->impl->common; + const auto cddssub = static_cast(subscription->data); + std::lock_guard guard(common->node_update_mutex); + rmw_dds_common::msg::ParticipantEntitiesInfo msg = + common->graph_cache.dissociate_writer( + cddssub->gid, common->gid, node->name, + node->namespace_); + if (RMW_RET_OK != rmw_publish(common->pub, static_cast(&msg), nullptr)) { + RMW_SET_ERROR_MSG( + "failed to publish ParticipantEntitiesInfo message after dissociating reader"); + } + } + return destroy_subscription(subscription); +} + static rmw_ret_t rmw_take_int( const rmw_subscription_t * subscription, void * ros_message, bool * taken, rmw_message_info_t * message_info) @@ -2082,7 +2246,6 @@ extern "C" rmw_ret_t rmw_take_loaned_message( (void) loaned_message; (void) taken; (void) allocation; - RMW_SET_ERROR_MSG("rmw_take_loaned_message not implemented for rmw_cyclonedds_cpp"); return RMW_RET_UNSUPPORTED; } @@ -2099,7 +2262,6 @@ extern "C" rmw_ret_t rmw_take_loaned_message_with_info( (void) taken; (void) message_info; (void) allocation; - RMW_SET_ERROR_MSG("rmw_take_loaned_message_with_info not implemented for rmw_cyclonedds_cpp"); return RMW_RET_UNSUPPORTED; } @@ -2110,7 +2272,6 @@ extern "C" rmw_ret_t rmw_return_loaned_message_from_subscription( { (void) subscription; (void) loaned_message; - RMW_SET_ERROR_MSG( "rmw_return_loaned_message_from_subscription not implemented for rmw_cyclonedds_cpp"); return RMW_RET_UNSUPPORTED; @@ -2128,10 +2289,8 @@ static const std::unordered_map mask_map{ {RMW_EVENT_REQUESTED_DEADLINE_MISSED, DDS_REQUESTED_DEADLINE_MISSED_STATUS}, {RMW_EVENT_LIVELINESS_LOST, DDS_LIVELINESS_LOST_STATUS}, {RMW_EVENT_OFFERED_DEADLINE_MISSED, DDS_OFFERED_DEADLINE_MISSED_STATUS}, -#if RMW_VERSION_GTE(0, 8, 2) {RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS}, {RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS}, -#endif }; static bool is_event_supported(const rmw_event_type_t event_t) @@ -2155,11 +2314,9 @@ static rmw_ret_t init_rmw_event( RMW_SET_ERROR_MSG("provided event_type is not supported by rmw_cyclonedds_cpp"); return RMW_RET_UNSUPPORTED; } - rmw_event->implementation_identifier = topic_endpoint_impl_identifier; rmw_event->data = data; rmw_event->event_type = event_type; - return RMW_RET_OK; } @@ -2225,7 +2382,6 @@ extern "C" rmw_ret_t rmw_take_event( } } -#if RMW_VERSION_GTE(0, 8, 2) case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: { auto ei = static_cast(event_info); auto sub = static_cast(event_handle->data); @@ -2242,7 +2398,6 @@ extern "C" rmw_ret_t rmw_take_event( return RMW_RET_OK; } } -#endif case RMW_EVENT_LIVELINESS_LOST: { auto ei = static_cast(event_info); @@ -2274,7 +2429,6 @@ extern "C" rmw_ret_t rmw_take_event( } } -#if RMW_VERSION_GTE(0, 8, 2) case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: { auto ei = static_cast(event_info); auto pub = static_cast(event_handle->data); @@ -2291,7 +2445,6 @@ extern "C" rmw_ret_t rmw_take_event( return RMW_RET_OK; } } -#endif case RMW_EVENT_INVALID: { break; @@ -2310,20 +2463,11 @@ extern "C" rmw_ret_t rmw_take_event( /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * context) +static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl) { - static_cast(context); rmw_guard_condition_t * guard_condition_handle; auto * gcond_impl = new CddsGuardCondition(); -#if MULTIDOMAIN - const dds_entity_t owner = DDS_CYCLONEDDS_HANDLE; -#else - const dds_entity_t owner = ref_ppant(); - if (owner < 0) { - goto fail_ppant; - } -#endif - if ((gcond_impl->gcondh = dds_create_guardcondition(owner)) < 0) { + if ((gcond_impl->gcondh = dds_create_guardcondition(impl->ppant)) < 0) { RMW_SET_ERROR_MSG("failed to create guardcondition"); goto fail_guardcond; } @@ -2333,15 +2477,16 @@ extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * co return guard_condition_handle; fail_guardcond: -#if !MULTIDOMAIN - unref_ppant(); -fail_ppant: -#endif delete (gcond_impl); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t * guard_condition_handle) +extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * context) +{ + return create_guard_condition(context->impl); +} + +static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * guard_condition_handle) { RET_NULL(guard_condition_handle); auto * gcond_impl = static_cast(guard_condition_handle->data); @@ -2352,6 +2497,11 @@ extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t * guard_c return RMW_RET_OK; } +extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t * guard_condition_handle) +{ + return destroy_guard_condition(guard_condition_handle); +} + extern "C" rmw_ret_t rmw_trigger_guard_condition( const rmw_guard_condition_t * guard_condition_handle) { @@ -2363,11 +2513,9 @@ extern "C" rmw_ret_t rmw_trigger_guard_condition( extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t max_conditions) { - static_cast(context); (void) max_conditions; rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); CddsWaitset * ws = nullptr; - dds_entity_t owner = 0; RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); wait_set->implementation_identifier = eclipse_cyclonedds_identifier; wait_set->data = rmw_allocate(sizeof(CddsWaitset)); @@ -2382,31 +2530,21 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t } ws->inuse = false; ws->nelems = 0; -#if MULTIDOMAIN - owner = DDS_CYCLONEDDS_HANDLE; -#else - owner = ref_ppant(); - if (owner < 0) { - goto fail_ppant; - } -#endif - if ((ws->waitseth = dds_create_waitset(owner)) < 0) { + if ((ws->waitseth = dds_create_waitset(DDS_CYCLONEDDS_HANDLE)) < 0) { RMW_SET_ERROR_MSG("failed to create waitset"); goto fail_waitset; } { std::lock_guard lock(gcdds.lock); -#if MULTIDOMAIN // Lazily create dummy guard condition if (gcdds.waitsets.size() == 0) { - if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(owner)) < 0) { + if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(context->impl->ppant)) < 0) { RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets"); goto fail_create_dummy; } } -#endif // Attach never-triggered guard condition. As it will never be triggered, it will never be // included in the result of dds_waitset_wait if (dds_waitset_attach(ws->waitseth, gcdds.gc_for_empty_waitset, INTPTR_MAX) < 0) { @@ -2419,15 +2557,9 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t return wait_set; fail_attach_dummy: -#if MULTIDOMAIN fail_create_dummy: -#endif dds_delete(ws->waitseth); fail_waitset: -#if !MULTIDOMAIN - unref_ppant(); -fail_ppant: -#endif fail_ws: RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws); fail_placement_new: @@ -2448,12 +2580,10 @@ extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t * wait_set) { std::lock_guard lock(gcdds.lock); gcdds.waitsets.erase(ws); -#if MULTIDOMAIN if (gcdds.waitsets.size() == 0) { dds_delete(gcdds.gc_for_empty_waitset); gcdds.gc_for_empty_waitset = 0; } -#endif } RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR); rmw_free(wait_set->data); @@ -2525,7 +2655,7 @@ static void clean_waitset_caches() used ... */ std::lock_guard lock(gcdds.lock); for (auto && ws : gcdds.waitsets) { - std::lock_guard lock(ws->lock); + std::lock_guard wslock(ws->lock); if (!ws->inuse) { waitset_detach(ws); } @@ -2890,8 +3020,6 @@ static rmw_ret_t rmw_init_cs( RET_WRONG_IMPLID(node); RET_NULL_OR_EMPTYSTR(service_name); RET_NULL(qos_policies); - auto node_impl = static_cast(node->data); - RET_NULL(node_impl); const rosidl_service_type_support_t * type_support = get_service_typesupport(type_supports); RET_NULL(type_support); @@ -2940,7 +3068,7 @@ static rmw_ret_t rmw_init_cs( pubtopic_name.c_str(), type_support->typesupport_identifier, pub_type_support, true, std::move(pub_msg_ts)); struct ddsi_sertopic * pub_stact; - pubtopic = create_topic(node_impl->enth, pub_st, &pub_stact); + pubtopic = create_topic(node->context->impl->ppant, pub_st, &pub_stact); if (pubtopic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); goto fail_pubtopic; @@ -2949,7 +3077,7 @@ static rmw_ret_t rmw_init_cs( sub_st = create_sertopic( subtopic_name.c_str(), type_support->typesupport_identifier, sub_type_support, true, std::move(sub_msg_ts)); - subtopic = create_topic(node_impl->enth, sub_st); + subtopic = create_topic(node->context->impl->ppant, sub_st); if (subtopic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); goto fail_subtopic; @@ -2960,15 +3088,17 @@ static rmw_ret_t rmw_init_cs( } dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); - if ((pub->enth = dds_create_writer(node_impl->pub, pubtopic, qos, nullptr)) < 0) { + if ((pub->enth = dds_create_writer(node->context->impl->dds_pub, pubtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } + get_entity_gid(pub->enth, pub->gid); pub->sertopic = pub_stact; - if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { + if ((sub->enth = dds_create_reader(node->context->impl->dds_sub, subtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } + get_entity_gid(sub->enth, sub->gid); if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) { RMW_SET_ERROR_MSG("failed to create readcondition"); goto fail_readcond; @@ -2997,23 +3127,50 @@ static rmw_ret_t rmw_init_cs( dds_delete(subtopic); fail_subtopic: dds_delete(pubtopic); -#ifndef DDS_HAS_CREATE_TOPIC_GENERIC - ddsi_sertopic_unref(pub_stact); -#endif fail_pubtopic: return RMW_RET_ERROR; } static void rmw_fini_cs(CddsCS * cs) { -#ifndef DDS_HAS_CREATE_TOPIC_GENERIC - ddsi_sertopic_unref(cs->pub->sertopic); -#endif dds_delete(cs->sub->rdcondh); dds_delete(cs->sub->enth); dds_delete(cs->pub->enth); } +static rmw_ret_t destroy_client(const rmw_node_t * node, rmw_client_t * client) +{ + RET_WRONG_IMPLID(node); + RET_WRONG_IMPLID(client); + auto info = static_cast(client->data); + clean_waitset_caches(); + + { + // Update graph + auto common = &node->context->impl->common; + std::lock_guard guard(common->node_update_mutex); + static_cast(common->graph_cache.dissociate_writer( + info->client.pub->gid, common->gid, + node->name, node->namespace_)); + rmw_dds_common::msg::ParticipantEntitiesInfo msg = + common->graph_cache.dissociate_reader( + info->client.sub->gid, common->gid, node->name, + node->namespace_); + if (RMW_RET_OK != rmw_publish( + common->pub, + static_cast(&msg), + nullptr)) + { + RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying service"); + } + } + + rmw_fini_cs(&info->client); + rmw_free(const_cast(client->service_name)); + rmw_client_free(client); + return RMW_RET_OK; +} + extern "C" rmw_client_t * rmw_create_client( const rmw_node_t * node, const rosidl_service_type_support_t * type_supports, @@ -3038,6 +3195,28 @@ extern "C" rmw_client_t * rmw_create_client( rmw_client->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); RET_NULL_X(rmw_client->service_name, goto fail_service_name); memcpy(const_cast(rmw_client->service_name), service_name, strlen(service_name) + 1); + + { + // Update graph + auto common = &node->context->impl->common; + std::lock_guard guard(common->node_update_mutex); + static_cast(common->graph_cache.associate_writer( + info->client.pub->gid, common->gid, + node->name, node->namespace_)); + rmw_dds_common::msg::ParticipantEntitiesInfo msg = + common->graph_cache.associate_reader( + info->client.sub->gid, common->gid, node->name, + node->namespace_); + if (RMW_RET_OK != rmw_publish( + common->pub, + static_cast(&msg), + nullptr)) + { + static_cast(destroy_client(node, rmw_client)); + return nullptr; + } + } + return rmw_client; fail_service_name: rmw_client_free(rmw_client); @@ -3047,14 +3226,40 @@ extern "C" rmw_client_t * rmw_create_client( } extern "C" rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) +{ + return destroy_client(node, client); +} + +static rmw_ret_t destroy_service(const rmw_node_t * node, rmw_service_t * service) { RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(client); - auto info = static_cast(client->data); + RET_WRONG_IMPLID(service); + auto info = static_cast(service->data); clean_waitset_caches(); - rmw_fini_cs(&info->client); - rmw_free(const_cast(client->service_name)); - rmw_client_free(client); + + { + // Update graph + auto common = &node->context->impl->common; + std::lock_guard guard(common->node_update_mutex); + static_cast(common->graph_cache.dissociate_writer( + info->service.pub->gid, common->gid, + node->name, node->namespace_)); + rmw_dds_common::msg::ParticipantEntitiesInfo msg = + common->graph_cache.dissociate_reader( + info->service.sub->gid, common->gid, node->name, + node->namespace_); + if (RMW_RET_OK != rmw_publish( + common->pub, + static_cast(&msg), + nullptr)) + { + RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying service"); + } + } + + rmw_fini_cs(&info->service); + rmw_free(const_cast(service->service_name)); + rmw_service_free(service); return RMW_RET_OK; } @@ -3080,6 +3285,28 @@ extern "C" rmw_service_t * rmw_create_service( reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); RET_NULL_X(rmw_service->service_name, goto fail_service_name); memcpy(const_cast(rmw_service->service_name), service_name, strlen(service_name) + 1); + + { + // Update graph + auto common = &node->context->impl->common; + std::lock_guard guard(common->node_update_mutex); + static_cast(common->graph_cache.associate_writer( + info->service.pub->gid, common->gid, + node->name, node->namespace_)); + rmw_dds_common::msg::ParticipantEntitiesInfo msg = + common->graph_cache.associate_reader( + info->service.sub->gid, common->gid, node->name, + node->namespace_); + if (RMW_RET_OK != rmw_publish( + common->pub, + static_cast(&msg), + nullptr)) + { + static_cast(destroy_service(node, rmw_service)); + return nullptr; + } + } + return rmw_service; fail_service_name: rmw_service_free(rmw_service); @@ -3090,14 +3317,7 @@ extern "C" rmw_service_t * rmw_create_service( extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(service); - auto info = static_cast(service->data); - clean_waitset_caches(); - rmw_fini_cs(&info->service); - rmw_free(const_cast(service->service_name)); - rmw_service_free(service); - return RMW_RET_OK; + return destroy_service(node, service); } ///////////////////////////////////////////////////////////////////////////////////////// @@ -3106,725 +3326,103 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t * node, rmw_service_t * serv /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -enum topic_kind -{ - DEFAULT, - REQUEST_RESPONSE -}; - -static bool demangle_topic_name( - const char * topic_name, - topic_kind topic_kind, - std::string & demangled_topic_name, - bool * is_request) -{ - std::string re; - bool is_def_kind = topic_kind == topic_kind::DEFAULT; - if (is_def_kind) { - re = "^" + std::string(ROS_TOPIC_PREFIX) + "(/.*)"; - } else { - re = "^(" + std::string(ROS_SERVICE_REQUESTER_PREFIX) + "|" + - std::string(ROS_SERVICE_RESPONSE_PREFIX) + ")(/.*)(Request|Reply)$"; - } - const auto re_tp = std::regex(re, std::regex::extended); - std::cmatch cm_tp; - if (std::regex_search(topic_name, cm_tp, re_tp)) { - demangled_topic_name = std::string(cm_tp[is_def_kind ? 1 : 2]); - if (is_request) { - assert(!is_def_kind); - *is_request = (std::string(cm_tp[3]) == "Request"); - } - return true; - } - return false; -} - -static bool demangle_topic_name( - const char * topic_name, - topic_kind topic_kind, - std::string & demangled_topic_name) -{ - return demangle_topic_name(topic_name, topic_kind, demangled_topic_name, NULL); -} - -static bool demangle_topic_type( - const char * type_name, - topic_kind topic_kind, - std::string & demangled_type_name) -{ - bool is_def_kind = topic_kind == topic_kind::DEFAULT; - const auto re_typ = std::regex( - "^(.*::)dds_::(.*)_" + - (is_def_kind ? std::string() : std::string("(Response|Request)_")) + - std::string("$"), - std::regex::extended); - std::cmatch cm_typ; - if (std::regex_search(type_name, cm_typ, re_typ)) { - demangled_type_name = - std::regex_replace(std::string(cm_typ[1]), std::regex("::"), "/") + - std::string(cm_typ[2]); - return true; - } - return false; -} - -static rmw_ret_t do_for_node( - const CddsNode * node_impl, - std::function oper) -{ - dds_entity_t rd; - if ( - (rd = dds_create_reader( - node_impl->enth, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) - { - RMW_SET_ERROR_MSG("rmw_get_node_names: failed to create reader"); - return RMW_RET_ERROR; - } - dds_sample_info_t info; - void * msg = NULL; - int32_t n; - bool cont = true; - while (cont && (n = dds_take(rd, &msg, &info, 1, 1)) == 1) { - if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { - auto sample = static_cast(msg); - cont = oper(*sample); - } - dds_return_loan(rd, &msg, n); - } - dds_delete(rd); - if (n < 0) { - RMW_SET_ERROR_MSG("rmw_get_node_names: error reading participants"); - return RMW_RET_ERROR; - } - return RMW_RET_OK; -} - -static rmw_ret_t do_for_node_user_data( - const CddsNode * node_impl, - std::function oper) -{ - auto f = [oper](const dds_builtintopic_participant_t & sample) -> bool { - void * ud; - size_t udsz; - if (dds_qget_userdata(sample.qos, &ud, &udsz) && ud != nullptr) { - /* CycloneDDS guarantees a null-terminated user data so we pretend it's a - string */ - bool ret = oper(sample, static_cast(ud)); - dds_free(ud); - return ret; - } else { - /* If no user data present treat it as an empty string */ - return oper(sample, ""); - } - }; - return do_for_node(node_impl, f); -} - -extern "C" rmw_ret_t rmw_get_node_names_impl( +extern "C" rmw_ret_t rmw_get_node_names( const rmw_node_t * node, rcutils_string_array_t * node_names, - rcutils_string_array_t * node_namespaces, - rcutils_string_array_t * enclaves) + rcutils_string_array_t * node_namespaces) { RET_WRONG_IMPLID(node); - auto node_impl = static_cast(node->data); if (RMW_RET_OK != rmw_check_zero_rmw_string_array(node_names) || RMW_RET_OK != rmw_check_zero_rmw_string_array(node_namespaces)) { return RMW_RET_ERROR; } - std::regex re { - "^name=([^;]*);namespace=([^;]*);(enclave=([^;]*);)?", - std::regex_constants::extended - }; - std::vector> ns; - auto oper = - [&ns, re](const dds_builtintopic_participant_t & sample, const char * ud) -> bool { - std::cmatch cm; - static_cast(sample); - if (std::regex_search(ud, cm, re)) { - ns.push_back(std::make_tuple(std::string(cm[1]), std::string(cm[2]), std::string(cm[4]))); - } - return true; - }; - rmw_ret_t ret; - if ((ret = do_for_node_user_data(node_impl, oper)) != RMW_RET_OK) { - return ret; - } - + auto common_context = &node->context->impl->common; rcutils_allocator_t allocator = rcutils_get_default_allocator(); - if (rcutils_string_array_init(node_names, ns.size(), &allocator) != RCUTILS_RET_OK || - rcutils_string_array_init(node_namespaces, ns.size(), &allocator) != RCUTILS_RET_OK) - { - RMW_SET_ERROR_MSG(rcutils_get_error_string().str); - goto fail_alloc; - } - if (enclaves && - rcutils_string_array_init(enclaves, ns.size(), &allocator) != RCUTILS_RET_OK) - { - RMW_SET_ERROR_MSG(rcutils_get_error_string().str); - goto fail_alloc; - } - { - size_t i = 0; - for (auto & n : ns) { - node_names->data[i] = rcutils_strdup(std::get<0>(n).c_str(), allocator); - node_namespaces->data[i] = rcutils_strdup(std::get<1>(n).c_str(), allocator); - if (!node_names->data[i] || !node_namespaces->data[i]) { - RMW_SET_ERROR_MSG("rmw_get_node_names for name/namespace"); - goto fail_alloc; - } - if (enclaves) { - enclaves->data[i] = rcutils_strdup(std::get<2>(n).c_str(), allocator); - if (!enclaves->data[i]) { - RMW_SET_ERROR_MSG("rmw_get_node_names for enclave"); - goto fail_alloc; - } - } - i++; - } - } - return RMW_RET_OK; - -fail_alloc: - if (node_names) { - if (rcutils_string_array_fini(node_names) != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string().str); - rcutils_reset_error(); - } - } - if (node_namespaces) { - if (rcutils_string_array_fini(node_namespaces) != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string().str); - rcutils_reset_error(); - } - } - if (enclaves) { - if (rcutils_string_array_fini(enclaves) != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string().str); - rcutils_reset_error(); - } - } - return RMW_RET_BAD_ALLOC; -} - -extern "C" rmw_ret_t rmw_get_node_names( - const rmw_node_t * node, - rcutils_string_array_t * node_names, - rcutils_string_array_t * node_namespaces) -{ - return rmw_get_node_names_impl( - node, + return common_context->graph_cache.get_node_names( node_names, node_namespaces, - nullptr); + nullptr, + &allocator); } -#if RMW_VERSION_GTE(0, 8, 2) extern "C" rmw_ret_t rmw_get_node_names_with_enclaves( const rmw_node_t * node, rcutils_string_array_t * node_names, rcutils_string_array_t * node_namespaces, rcutils_string_array_t * enclaves) { - if (RMW_RET_OK != rmw_check_zero_rmw_string_array(enclaves)) { - return RMW_RET_ERROR; - } - return rmw_get_node_names_impl( - node, - node_names, - node_namespaces, - enclaves); -} -#endif - -static rmw_ret_t rmw_collect_data_for_endpoint( - CddsNode * node_impl, - dds_entity_t builtin_topic, - std::function filter_and_map) -{ - assert( - builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || - builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); - dds_entity_t rd; - if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) { - RMW_SET_ERROR_MSG("rmw_collect_data_for_endpoint failed to create reader"); - return RMW_RET_ERROR; - } - dds_sample_info_t info; - void * msg = NULL; - int32_t n; - while ((n = dds_take(rd, &msg, &info, 1, 1)) == 1) { - if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { - auto sample = static_cast(msg); - filter_and_map(*sample); - } - dds_return_loan(rd, &msg, n); - } - dds_delete(rd); - if (n == 0) { - return RMW_RET_OK; - } else { - RMW_SET_ERROR_MSG("rmw_collect_data_for_endpoint dds_take failed"); - return RMW_RET_ERROR; - } -} - -#if RMW_VERSION_GTE(0, 8, 2) -struct endpoint_info_t -{ - std::string topic_type; - dds_builtintopic_guid_t endpoint_key; - dds_builtintopic_guid_t node_key; - dds_instance_handle_t node_insth; - rmw_qos_profile_t qos_profile; -}; - -typedef std::map> node_cache_t; - -static void get_node_name( - dds_entity_t ppant_rd, - const dds_builtintopic_guid_t & pp_guid, - const dds_instance_handle_t pp_insth, - std::string & node_name, - std::string & node_ns) -{ - static_cast(pp_guid); // only used in assert() - bool node_found = false; - const auto re_ud = std::regex("^name=([^;]*);namespace=([^;]*);", std::regex::extended); - size_t udsz; - dds_sample_info_t info; - void * msg = NULL; - int32_t n; - if ((n = dds_read_instance(ppant_rd, &msg, &info, 1, 1, pp_insth)) == 1) { - void * ud; - auto sample = static_cast(msg); - assert(!memcmp(&sample->key, &pp_guid, sizeof(dds_builtintopic_guid_t))); - if (dds_qget_userdata(sample->qos, &ud, &udsz) && ud != nullptr) { - std::cmatch cm; - if (std::regex_search(reinterpret_cast(ud), cm, re_ud)) { - node_found = true; - node_name = std::string(cm[1]); - node_ns = std::string(cm[2]); - } - } - dds_return_loan(ppant_rd, &msg, n); - } - if (!node_found) { - node_name = "_NODE_NAME_UNKNOWN_"; - node_ns = "_NODE_NAMESPACE_UNKNOWN_"; - } -} - -static rmw_ret_t -set_rmw_topic_endpoint_info( - node_cache_t & node_cache, - dds_entity_t ppant_rd, - rcutils_allocator_t * allocator, - const endpoint_info_t & epinfo, - bool is_publisher, - rmw_topic_endpoint_info_t & topic_endpoint_info) -{ - static_assert( - sizeof(dds_builtintopic_guid_t) <= RMW_GID_STORAGE_SIZE, - "RMW_GID_STORAGE_SIZE insufficient to store the rmw_cyclonedds_cpp GID implementation." - ); - rmw_ret_t ret; - std::string node_name; - std::string node_ns; - const auto it = node_cache.find(epinfo.node_key); - if (it != node_cache.end()) { - node_name = it->second.first; - node_ns = it->second.second; - } else { - get_node_name(ppant_rd, epinfo.node_key, epinfo.node_insth, node_name, node_ns); - node_cache[epinfo.node_key] = std::make_pair(node_name, node_ns); - } - if ((ret = rmw_topic_endpoint_info_set_node_name( - &topic_endpoint_info, - node_name.c_str(), - allocator)) != RMW_RET_OK) - { - return ret; - } - if ((ret = rmw_topic_endpoint_info_set_node_namespace( - &topic_endpoint_info, - node_ns.c_str(), - allocator)) != RMW_RET_OK) - { - return ret; - } - if ((ret = rmw_topic_endpoint_info_set_topic_type( - &topic_endpoint_info, - epinfo.topic_type.c_str(), - allocator)) != RMW_RET_OK) - { - return ret; - } - if ((ret = rmw_topic_endpoint_info_set_endpoint_type( - &topic_endpoint_info, - is_publisher ? RMW_ENDPOINT_PUBLISHER : RMW_ENDPOINT_SUBSCRIPTION)) != RMW_RET_OK) - { - return ret; - } - if ((ret = rmw_topic_endpoint_info_set_gid( - &topic_endpoint_info, - epinfo.endpoint_key.v, - sizeof(dds_builtintopic_guid_t))) != RMW_RET_OK) - { - return ret; - } - if ((ret = rmw_topic_endpoint_info_set_qos_profile( - &topic_endpoint_info, - &epinfo.qos_profile)) != RMW_RET_OK) - { - return ret; - } - return RMW_RET_OK; -} - -static void handle_topic_endpoint_info_array_fini( - rmw_topic_endpoint_info_array_t * topic_endpoint_info_array, - rcutils_allocator_t * allocator) -{ - std::string err_msg; - rmw_ret_t ret; - bool err_set = rmw_error_is_set(); - if (err_set) { - err_msg = rmw_get_error_string().str; - } - rmw_reset_error(); - if ((ret = - rmw_topic_endpoint_info_array_fini(topic_endpoint_info_array, allocator)) != RMW_RET_OK) - { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", - "handle_topic_endpoint_info_array_fini failed: %s", - rmw_get_error_string().str); - rmw_reset_error(); - } - if (err_set) { - RMW_SET_ERROR_MSG(err_msg.c_str()); - } -} - -static rmw_ret_t -get_endpoint_info_by_topic( - const rmw_node_t * node, - rcutils_allocator_t * allocator, - const char * topic_name, - bool no_mangle, - bool is_publisher, - rmw_topic_endpoint_info_array_t * endpoint_info) -{ - auto node_impl = static_cast(node->data); - const std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", no_mangle); - std::vector endpoint_info_vector; - rmw_ret_t ret; - - const auto filter_and_map = - [fqtopic_name, no_mangle, &endpoint_info_vector]( - const dds_builtintopic_endpoint_t & sample) -> void - { - endpoint_info_t info; - if (std::string(sample.topic_name) == fqtopic_name) { - info.endpoint_key = sample.key; - dds_qos_to_rmw_qos(sample.qos, &info.qos_profile); - if (no_mangle) { - info.topic_type = std::string(sample.type_name); - } else if (!demangle_topic_type(sample.type_name, topic_kind::DEFAULT, info.topic_type)) { - // skip a non-ROS2 endpoint - return; - } - info.node_key = sample.participant_key; - info.node_insth = sample.participant_instance_handle; - endpoint_info_vector.push_back(info); - } - }; - if ((ret = rmw_collect_data_for_endpoint( - node_impl, - is_publisher ? DDS_BUILTIN_TOPIC_DCPSPUBLICATION : DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, - filter_and_map)) != RMW_RET_OK) - { - return ret; - } - - size_t epi_count = endpoint_info_vector.size(); - if ((ret = rmw_topic_endpoint_info_array_init_with_size( - endpoint_info, - epi_count, - allocator)) != RMW_RET_OK) - { - rmw_error_string_t err_msg = rmw_get_error_string(); - rmw_reset_error(); - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "rmw_topic_endpoint_info_array_init_with_size failed to allocate memory: %s", - err_msg.str); - return ret; - } - - node_cache_t node_cache; - dds_entity_t ppant_rd; - if ( - (ppant_rd = dds_create_reader( - node_impl->enth, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) + RET_WRONG_IMPLID(node); + if (RMW_RET_OK != rmw_check_zero_rmw_string_array(node_names) || + RMW_RET_OK != rmw_check_zero_rmw_string_array(node_namespaces)) { - RMW_SET_ERROR_MSG("get_endpoint_info_by_topic: failed to create reader"); - handle_topic_endpoint_info_array_fini(endpoint_info, allocator); return RMW_RET_ERROR; } - size_t i = 0; - for (const auto & epi : endpoint_info_vector) { - if ((ret = set_rmw_topic_endpoint_info( - node_cache, - ppant_rd, - allocator, - epi, - is_publisher, - endpoint_info->info_array[i])) != RMW_RET_OK) - { - handle_topic_endpoint_info_array_fini(endpoint_info, allocator); - dds_delete(ppant_rd); - return ret; - } - i++; - } - assert(i == epi_count); - dds_delete(ppant_rd); - return RMW_RET_OK; -} -#endif // RMW_VERSION_GTE(0, 8, 2) - -static rmw_ret_t make_names_and_types( - rmw_names_and_types_t * tptyp, const std::map> & source, - rcutils_allocator_t * allocator) -{ - if (source.size() == 0) { - return RMW_RET_OK; - } - rmw_ret_t ret; - if ((ret = rmw_names_and_types_init(tptyp, source.size(), allocator)) != RMW_RET_OK) { - return ret; - } - size_t index = 0; - for (const auto & tp : source) { - if ((tptyp->names.data[index] = rcutils_strdup(tp.first.c_str(), *allocator)) == NULL) { - goto fail_mem; - } - if ( - rcutils_string_array_init( - &tptyp->types[index], tp.second.size(), allocator) != RCUTILS_RET_OK) - { - goto fail_mem; - } - size_t type_index = 0; - for (const auto & type : tp.second) { - if ((tptyp->types[index].data[type_index] = - rcutils_strdup(type.c_str(), *allocator)) == NULL) - { - goto fail_mem; - } - type_index++; - } - index++; - } - return RMW_RET_OK; - -fail_mem: - if (rmw_names_and_types_fini(tptyp) != RMW_RET_OK) { - RMW_SET_ERROR_MSG("make_names_and_types: rmw_names_and_types_fini failed"); - } - return RMW_RET_BAD_ALLOC; -} -static rmw_ret_t get_node_guids( - CddsNode * node_impl, - const char * node_name, const char * node_namespace, - std::set & guids) -{ - std::string needle = get_node_user_data_name_ns(node_name, node_namespace); - auto oper = - [&guids, needle](const dds_builtintopic_participant_t & sample, const char * ud) -> bool { - if (0u == std::string(ud).find(needle)) { - guids.insert(sample.key); - } - return true; /* do keep looking - what if there are many? */ - }; - rmw_ret_t ret = do_for_node_user_data(node_impl, oper); - if (ret != RMW_RET_OK) { - return ret; - } else if (guids.size() == 0) { - /* It appears the get_..._by_node operations are supposed to return - NODE_NAME_NON_EXISTENT (newly introduced in Eloquent) or ERROR - (on Dashing and earlier) if no such node exists */ -#ifdef RMW_RET_NODE_NAME_NON_EXISTENT - return RMW_RET_NODE_NAME_NON_EXISTENT; -#else - return RMW_RET_ERROR; -#endif - } else { - return RMW_RET_OK; - } + auto common_context = &node->context->impl->common; + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + return common_context->graph_cache.get_node_names( + node_names, + node_namespaces, + enclaves, + &allocator); } -static rmw_ret_t get_endpoint_names_and_types_by_node( +extern "C" rmw_ret_t rmw_get_topic_names_and_types( const rmw_node_t * node, rcutils_allocator_t * allocator, - const char * node_name, - const char * node_namespace, bool no_demangle, - rmw_names_and_types_t * tptyp, bool subs, - bool pubs) + bool no_demangle, rmw_names_and_types_t * tptyp) { RET_WRONG_IMPLID(node); - auto node_impl = static_cast(node->data); RET_NULL(allocator); rmw_ret_t ret = rmw_names_and_types_check_zero(tptyp); if (ret != RMW_RET_OK) { return ret; } - if (node_name) { - int dummy_validation_result; - size_t dummy_invalid_index; - if ((ret = - rmw_validate_node_name( - node_name, &dummy_validation_result, &dummy_invalid_index)) != RMW_RET_OK) - { - return ret; - } - } - std::set guids; - if (node_name != nullptr && - (ret = get_node_guids(node_impl, node_name, node_namespace, guids)) != RMW_RET_OK) - { - return ret; - } - std::map> tt; - const auto filter_and_map = - [&tt, guids, node_name, no_demangle]( - const dds_builtintopic_endpoint_t & sample) -> void - { - std::string demangled_topic_name, demangled_type_name; - if ( - (node_name == nullptr || guids.count(sample.participant_key) != 0) && - demangle_topic_name(sample.topic_name, topic_kind::DEFAULT, demangled_topic_name) && - demangle_topic_type(sample.type_name, topic_kind::DEFAULT, demangled_type_name)) - { - tt[demangled_topic_name].insert( - no_demangle ? std::string(sample.type_name) : demangled_type_name); - } - }; - if (subs && - (ret = - rmw_collect_data_for_endpoint( - node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK) - { - return ret; - } - if (pubs && - (ret = - rmw_collect_data_for_endpoint( - node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) - { - return ret; - } - return make_names_and_types(tptyp, tt, allocator); -} - -static bool endpoint_is_from_service(bool is_request, const dds_builtintopic_guid_t & guid) -{ - /* Final byte of GUID is entity type, application writers use 0x02 (write with key) and 0x03 - (writer without key); readers use 0x04 and 0x07. See DDSI specification, table 9.1. - - A service has a reader for a request and a writer for the reply; a client has it the other - way round. Therefore, it is a service iff is_request is set and the GUID indicates it is a - writer. */ - if (is_request) { - return guid.v[15] == 0x04 || guid.v[15] == 0x07; - } else { - return guid.v[15] == 0x02 || guid.v[15] == 0x03; - } -} -static rmw_ret_t get_cs_names_and_types_by_node( - const rmw_node_t * node, - rcutils_allocator_t * allocator, - const char * node_name, const char * node_namespace, - rmw_names_and_types_t * sntyp, - bool looking_for_services) -{ - RET_WRONG_IMPLID(node); - auto node_impl = static_cast(node->data); - RET_NULL(allocator); - rmw_ret_t ret = rmw_names_and_types_check_zero(sntyp); - if (ret != RMW_RET_OK) { - return ret; - } - if (node_name) { - int dummy_validation_result; - size_t dummy_invalid_index; - if ((ret = - rmw_validate_node_name( - node_name, &dummy_validation_result, &dummy_invalid_index)) != RMW_RET_OK) - { - return ret; - } + DemangleFunction demangle_topic = _demangle_ros_topic_from_topic; + DemangleFunction demangle_type = _demangle_if_ros_type; + if (no_demangle) { + demangle_topic = _identity_demangle; + demangle_type = _identity_demangle; } - std::set guids; - if ( - node_name != nullptr && - (ret = get_node_guids(node_impl, node_name, node_namespace, guids)) != RMW_RET_OK) - { - return ret; - } - std::map> tt; - const auto filter_and_map = [&tt, guids, node_name, looking_for_services]( - const dds_builtintopic_endpoint_t & sample) -> void { - std::string topic_name, type_name; - bool is_request; - if ((node_name == nullptr || guids.count(sample.participant_key) != 0) && - demangle_topic_name( - sample.topic_name, topic_kind::REQUEST_RESPONSE, topic_name, &is_request) && - demangle_topic_type(sample.type_name, topic_kind::REQUEST_RESPONSE, type_name) && - looking_for_services == endpoint_is_from_service(is_request, sample.key)) - { - tt[topic_name].insert(type_name); - } - }; - if ((ret = - rmw_collect_data_for_endpoint( - node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK || - (ret = - rmw_collect_data_for_endpoint( - node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) - { - return ret; - } - return make_names_and_types(sntyp, tt, allocator); + auto common_context = &node->context->impl->common; + return common_context->graph_cache.get_names_and_types( + demangle_topic, + demangle_type, + allocator, + tptyp); } -extern "C" rmw_ret_t rmw_get_topic_names_and_types( +extern "C" rmw_ret_t rmw_get_service_names_and_types( const rmw_node_t * node, rcutils_allocator_t * allocator, - bool no_demangle, rmw_names_and_types_t * tptyp) + rmw_names_and_types_t * sntyp) { - return get_endpoint_names_and_types_by_node( - node, allocator, nullptr, nullptr, no_demangle, tptyp, true, true); + auto common_context = &node->context->impl->common; + return common_context->graph_cache.get_names_and_types( + _demangle_service_from_topic, + _demangle_service_type_only, + allocator, + sntyp); } -extern "C" rmw_ret_t rmw_get_service_names_and_types( - const rmw_node_t * node, - rcutils_allocator_t * allocator, - rmw_names_and_types_t * sntyp) +static rmw_ret_t get_topic_name(dds_entity_t endpoint_handle, std::string & name) { - return get_cs_names_and_types_by_node(node, allocator, nullptr, nullptr, sntyp, true); + /* dds_get_name needs a bit of TLC ... */ + std::vector tmp(128); + do { + if (dds_get_name(dds_get_topic(endpoint_handle), tmp.data(), tmp.size()) < 0) { + return RMW_RET_ERROR; + } + auto end = std::find(tmp.begin(), tmp.end(), 0); + if (end != tmp.end()) { + name = std::string(tmp.begin(), end); + return RMW_RET_OK; + } + tmp.resize(2 * tmp.size()); + } while (true); } extern "C" rmw_ret_t rmw_service_server_is_available( @@ -3835,51 +3433,41 @@ extern "C" rmw_ret_t rmw_service_server_is_available( RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(client); RET_NULL(is_available); + *is_available = false; + auto info = static_cast(client->data); + auto common_context = &node->context->impl->common; + + std::string sub_topic_name, pub_topic_name; + if (get_topic_name(info->client.pub->enth, pub_topic_name) < 0 || + get_topic_name(info->client.sub->enth, sub_topic_name) < 0) + { + RMW_SET_ERROR_MSG("rmw_service_server_is_available: failed to get topic names"); + return RMW_RET_ERROR; + } + + size_t number_of_request_subscribers = 0; + rmw_ret_t ret = + common_context->graph_cache.get_reader_count(pub_topic_name, &number_of_request_subscribers); + if (ret != RMW_RET_OK || 0 == number_of_request_subscribers) { + return ret; + } + size_t number_of_response_publishers = 0; + ret = + common_context->graph_cache.get_writer_count(sub_topic_name, &number_of_response_publishers); + if (ret != RMW_RET_OK || 0 == number_of_response_publishers) { + // error + return ret; + } dds_publication_matched_status_t ps; dds_subscription_matched_status_t cs; if (dds_get_publication_matched_status(info->client.pub->enth, &ps) < 0 || dds_get_subscription_matched_status(info->client.sub->enth, &cs) < 0) { RMW_SET_ERROR_MSG("rmw_service_server_is_available: get_..._matched_status failed"); - return RMW_RET_ERROR; - } - *is_available = ps.current_count > 0 && cs.current_count > 0; - return RMW_RET_OK; -} - -static rmw_ret_t rmw_count_pubs_or_subs( - const rmw_node_t * node, dds_entity_t builtin_topic, - const char * topic_name, size_t * count) -{ - assert( - builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || - builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); - RET_NULL(topic_name); - RET_NULL(count); - RET_WRONG_IMPLID(node); - auto node_impl = static_cast(node->data); - - std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", false); - dds_entity_t rd; - if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) { - RMW_SET_ERROR_MSG("rmw_count_pubs_or_subs failed to create reader"); - return RMW_RET_ERROR; } - dds_sample_info_t info; - void * msg = NULL; - int32_t n; - *count = 0; - while ((n = dds_take(rd, &msg, &info, 1, 1)) == 1) { - if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { - auto sample = static_cast(msg); - if (fqtopic_name == std::string(sample->topic_name)) { - (*count)++; - } - } - dds_return_loan(rd, &msg, n); - } - dds_delete(rd); + // all conditions met, there is a service server available + *is_available = true; return RMW_RET_OK; } @@ -3887,14 +3475,94 @@ extern "C" rmw_ret_t rmw_count_publishers( const rmw_node_t * node, const char * topic_name, size_t * count) { - return rmw_count_pubs_or_subs(node, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, topic_name, count); + auto common_context = &node->context->impl->common; + const std::string mangled_topic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", false); + return common_context->graph_cache.get_writer_count(mangled_topic_name, count); } extern "C" rmw_ret_t rmw_count_subscribers( const rmw_node_t * node, const char * topic_name, size_t * count) { - return rmw_count_pubs_or_subs(node, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, topic_name, count); + auto common_context = &node->context->impl->common; + const std::string mangled_topic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", false); + return common_context->graph_cache.get_reader_count(mangled_topic_name, count); +} + +using GetNamesAndTypesByNodeFunction = rmw_ret_t (*)( + rmw_dds_common::Context *, + const std::string &, + const std::string &, + DemangleFunction, + DemangleFunction, + rcutils_allocator_t *, + rmw_names_and_types_t *); + +static rmw_ret_t get_topic_names_and_types_by_node( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + DemangleFunction demangle_topic, + DemangleFunction demangle_type, + bool no_demangle, + GetNamesAndTypesByNodeFunction get_names_and_types_by_node, + rmw_names_and_types_t * topic_names_and_types) +{ + RET_WRONG_IMPLID(node); + RET_NULL(allocator); + RET_NULL(node_name); + RET_NULL(node_namespace); + RET_NULL(topic_names_and_types); + auto common_context = &node->context->impl->common; + if (no_demangle) { + demangle_topic = _identity_demangle; + demangle_type = _identity_demangle; + } + return get_names_and_types_by_node( + common_context, + node_name, + node_namespace, + demangle_topic, + demangle_type, + allocator, + topic_names_and_types); +} + +static rmw_ret_t get_reader_names_and_types_by_node( + rmw_dds_common::Context * common_context, + const std::string & node_name, + const std::string & node_namespace, + DemangleFunction demangle_topic, + DemangleFunction demangle_type, + rcutils_allocator_t * allocator, + rmw_names_and_types_t * topic_names_and_types) +{ + return common_context->graph_cache.get_reader_names_and_types_by_node( + node_name, + node_namespace, + demangle_topic, + demangle_type, + allocator, + topic_names_and_types); +} + +static rmw_ret_t get_writer_names_and_types_by_node( + rmw_dds_common::Context * common_context, + const std::string & node_name, + const std::string & node_namespace, + DemangleFunction demangle_topic, + DemangleFunction demangle_type, + rcutils_allocator_t * allocator, + rmw_names_and_types_t * topic_names_and_types) +{ + return common_context->graph_cache.get_writer_names_and_types_by_node( + node_name, + node_namespace, + demangle_topic, + demangle_type, + allocator, + topic_names_and_types); } extern "C" rmw_ret_t rmw_get_subscriber_names_and_types_by_node( @@ -3905,8 +3573,10 @@ extern "C" rmw_ret_t rmw_get_subscriber_names_and_types_by_node( bool no_demangle, rmw_names_and_types_t * tptyp) { - return get_endpoint_names_and_types_by_node( - node, allocator, node_name, node_namespace, no_demangle, tptyp, true, false); + return get_topic_names_and_types_by_node( + node, allocator, node_name, node_namespace, + _demangle_ros_topic_from_topic, _demangle_if_ros_type, + no_demangle, get_reader_names_and_types_by_node, tptyp); } extern "C" rmw_ret_t rmw_get_publisher_names_and_types_by_node( @@ -3917,8 +3587,10 @@ extern "C" rmw_ret_t rmw_get_publisher_names_and_types_by_node( bool no_demangle, rmw_names_and_types_t * tptyp) { - return get_endpoint_names_and_types_by_node( - node, allocator, node_name, node_namespace, no_demangle, tptyp, false, true); + return get_topic_names_and_types_by_node( + node, allocator, node_name, node_namespace, + _demangle_ros_topic_from_topic, _demangle_if_ros_type, + no_demangle, get_writer_names_and_types_by_node, tptyp); } extern "C" rmw_ret_t rmw_get_service_names_and_types_by_node( @@ -3928,7 +3600,16 @@ extern "C" rmw_ret_t rmw_get_service_names_and_types_by_node( const char * node_namespace, rmw_names_and_types_t * sntyp) { - return get_cs_names_and_types_by_node(node, allocator, node_name, node_namespace, sntyp, true); + return get_topic_names_and_types_by_node( + node, + allocator, + node_name, + node_namespace, + _demangle_service_request_from_topic, + _demangle_service_type_only, + false, + get_reader_names_and_types_by_node, + sntyp); } extern "C" rmw_ret_t rmw_get_client_names_and_types_by_node( @@ -3938,12 +3619,18 @@ extern "C" rmw_ret_t rmw_get_client_names_and_types_by_node( const char * node_namespace, rmw_names_and_types_t * sntyp) { - return get_cs_names_and_types_by_node(node, allocator, node_name, node_namespace, sntyp, false); + return get_topic_names_and_types_by_node( + node, + allocator, + node_name, + node_namespace, + _demangle_service_reply_from_topic, + _demangle_service_type_only, + false, + get_reader_names_and_types_by_node, + sntyp); } - -#if RMW_VERSION_GTE(0, 8, 2) - extern "C" rmw_ret_t rmw_get_publishers_info_by_topic( const rmw_node_t * node, rcutils_allocator_t * allocator, @@ -3951,12 +3638,21 @@ extern "C" rmw_ret_t rmw_get_publishers_info_by_topic( bool no_mangle, rmw_topic_endpoint_info_array_t * publishers_info) { - return get_endpoint_info_by_topic( - node, + RET_WRONG_IMPLID(node); + RET_NULL(allocator); + RET_NULL(topic_name); + RET_NULL(publishers_info); + auto common_context = &node->context->impl->common; + std::string mangled_topic_name = topic_name; + DemangleFunction demangle_type = _identity_demangle; + if (!no_mangle) { + mangled_topic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", false); + demangle_type = _demangle_if_ros_type; + } + return common_context->graph_cache.get_writers_info_by_topic( + mangled_topic_name, + demangle_type, allocator, - topic_name, - no_mangle, - true, publishers_info); } @@ -3967,13 +3663,20 @@ extern "C" rmw_ret_t rmw_get_subscriptions_info_by_topic( bool no_mangle, rmw_topic_endpoint_info_array_t * subscriptions_info) { - return get_endpoint_info_by_topic( - node, + RET_WRONG_IMPLID(node); + RET_NULL(allocator); + RET_NULL(topic_name); + RET_NULL(subscriptions_info); + auto common_context = &node->context->impl->common; + std::string mangled_topic_name = topic_name; + DemangleFunction demangle_type = _identity_demangle; + if (!no_mangle) { + mangled_topic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", false); + demangle_type = _demangle_if_ros_type; + } + return common_context->graph_cache.get_readers_info_by_topic( + mangled_topic_name, + demangle_type, allocator, - topic_name, - no_mangle, - false, subscriptions_info); } - -#endif