From 6249f33be66de2ca6d64a79fe184b1de523a5dd7 Mon Sep 17 00:00:00 2001 From: Yadu Date: Sat, 2 Dec 2023 09:31:45 +0900 Subject: [PATCH] Refactor GraphCache to support multiple types for the same topic. (#70) * use std::find_if to find matching pubs Signed-off-by: Yadunund * Refactor to support multiple types Signed-off-by: Yadunund * Refactor liveliness token generation Signed-off-by: Yadunund * Map topic types to TopicData Signed-off-by: Yadunund * Store map of types in graph_topics_ Signed-off-by: Yadunund * fix bug in deleting and counting topics Signed-off-by: Yadunund * Cleanup liveliness_utils Signed-off-by: Yadunund * Style cleanups. Signed-off-by: Chris Lalancette * Cleanup some of the code in the liveliness. Also fix a couple of bugs where we were mixing up subscriptions and publications, leading to incorrect counts. Signed-off-by: Chris Lalancette * Remove Entity::node_info. Because it returned a reference, it is difficult to tell the lifetime of it. It would also be hard to add in locking in the future. Since we are reaching through it in most cases, just return some data directly. Signed-off-by: Chris Lalancette * Remove some uses of auto. Signed-off-by: Chris Lalancette * Return copies from Entity Signed-off-by: Yadunund * Address feedback Signed-off-by: Yadunund --------- Signed-off-by: Yadunund Signed-off-by: Yadunund Signed-off-by: Chris Lalancette Co-authored-by: Chris Lalancette --- rmw_zenoh_cpp/CMakeLists.txt | 1 + rmw_zenoh_cpp/src/detail/graph_cache.cpp | 824 ++++++++---------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 105 +-- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 362 ++++++++ rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 133 +++ rmw_zenoh_cpp/src/rmw_init.cpp | 3 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 111 ++- 7 files changed, 974 insertions(+), 565 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/liveliness_utils.cpp create mode 100644 rmw_zenoh_cpp/src/detail/liveliness_utils.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d3f93e72..24b70723 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -28,6 +28,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/identifier.cpp src/detail/graph_cache.cpp src/detail/guard_condition.cpp + src/detail/liveliness_utils.cpp src/detail/message_type_support.cpp src/detail/rmw_data_types.cpp src/detail/service_type_support.cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 025b0047..b47c14e5 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -32,491 +33,341 @@ #include "graph_cache.hpp" ///============================================================================= -std::string GenerateToken::liveliness(size_t domain_id) -{ - std::string token = "@ros2_lv/" + std::to_string(domain_id) + "/**"; - return token; -} +using Entity = liveliness::Entity; +using EntityType = liveliness::EntityType; ///============================================================================= -/** - * Generate a liveliness token for the particular entity. - * - * The liveliness tokens are in the form: - * - * @ros2_lv//// - * - * Where: - * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. - * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. - * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. - * - The ROS node name for this entity. - */ -static std::string generate_base_token( - const std::string & entity, - size_t domain_id, - const std::string & namespace_, - const std::string & name) +TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) +: pub_count_(pub_count), + sub_count_(sub_count) { - std::stringstream token_ss; - token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_; - // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". - // Hence we add an "_" to denote an empty namespace such that splitting the key - // will always result in 5 parts. - if (namespace_ == "/") { - token_ss << "_/"; - } else { - token_ss << "/"; - } - // Finally append node name. - token_ss << name; - return token_ss.str(); + // Do nothing. } ///============================================================================= -std::string GenerateToken::node( - size_t domain_id, - const std::string & namespace_, - const std::string & name) -{ - return generate_base_token("NN", domain_id, namespace_, name); -} +TopicData::TopicData( + liveliness::TopicInfo info, + TopicStats stats) +: info_(std::move(info)), + stats_(std::move(stats)) +{} ///============================================================================= -std::string GenerateToken::publisher( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos) +void GraphCache::parse_put(const std::string & keyexpr) { - std::string token = generate_base_token("MP", domain_id, node_namespace, node_name); - token += topic + "/" + type + "/" + qos; - return token; -} + std::optional valid_entity = liveliness::Entity::make(keyexpr); + if (!valid_entity.has_value()) { + // Error message has already been logged. + return; + } + const liveliness::Entity & entity = *valid_entity; + + // Helper lambda to append pub/subs to the GraphNode. + // We capture by reference to update graph_topics_ + auto add_topic_data = + [this](const Entity & entity, GraphNode & graph_node) -> void + { + if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { + return; + } -///============================================================================= -std::string GenerateToken::subscription( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos) -{ - std::string token = generate_base_token("MS", domain_id, node_namespace, node_name); - token += topic + "/" + type + "/" + qos; - return token; -} + if (!entity.topic_info().has_value()) { + // This should not happen as add_topic_data() is called after validating the existence + // of topic_info. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "add_topic_data() called without valid TopicInfo. Report this."); + return; + } -///============================================================================= -bool PublishToken::put( - z_owned_session_t * session, - const std::string & token) -{ - if (!z_session_check(session)) { - RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); - return false; - } + const liveliness::TopicInfo topic_info = entity.topic_info().value(); + GraphNode::TopicMap & topic_map = entity.type() == + EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity.type() == + EntityType::Publisher ? "publisher" : "subscription"; + const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; + const std::size_t sub_count = !pub_count; + TopicDataPtr graph_topic_data = std::make_shared( + topic_info, + TopicStats{pub_count, sub_count}); + + GraphNode::TopicDataMap topic_data_map = { + {graph_topic_data->info_.type_, graph_topic_data}}; + std::pair insertion = + topic_map.insert(std::make_pair(topic_info.name_, topic_data_map)); + if (!insertion.second) { + // A topic with the same name already exists in the node so we append the type. + std::pair type_insertion = + insertion.first->second.insert( + std::make_pair( + graph_topic_data->info_.type_, + graph_topic_data)); + if (!type_insertion.second) { + // We have another instance of a pub/sub over the same topic and type so we increment + // the counters. + TopicDataPtr & existing_graph_topic = type_insertion.first->second; + existing_graph_topic->stats_.pub_count_ += pub_count; + existing_graph_topic->stats_.sub_count_ += sub_count; + } + } - // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. - z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); - auto drop_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); - return false; - } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); - z_put_options_t options = z_put_options_default(); - options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { - RCUTILS_SET_ERROR_MSG("unable to publish liveliness for node creation"); - return false; - } + // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph + GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(topic_info.name_); + if (cache_topic_it == graph_topics_.end()) { + // First time this topic name is added to the graph. + graph_topics_[topic_info.name_] = std::move(topic_data_map); + } else { + // If a TopicData entry for the same type exists in the topic map, update pub/sub counts + // or else create an new TopicData. + std::pair topic_data_insertion = + cache_topic_it->second.insert(std::make_pair(topic_info.type_, nullptr)); + if (topic_data_insertion.second) { + // A TopicData for the topic_type does not exist. + topic_data_insertion.first->second = std::make_shared( + topic_info, + TopicStats{pub_count, sub_count}); + } else { + // Update the existing counters. + topic_data_insertion.first->second->stats_.pub_count_ += pub_count; + topic_data_insertion.first->second->stats_.sub_count_ += sub_count; + } + } - return true; -} + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Added %s on topic %s with type %s and qos %s to node /%s.", + entity_desc.c_str(), + topic_info.name_.c_str(), + topic_info.type_.c_str(), + topic_info.qos_.c_str(), + graph_node.name_.c_str()); + }; -///============================================================================= -bool PublishToken::del( - z_owned_session_t * session, - const std::string & token) -{ - if (!z_session_check(session)) { - RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); - return false; - } + // Helper lambda to convert an Entity into a GraphNode. + auto make_graph_node = + [&](const Entity & entity) -> std::shared_ptr + { + auto graph_node = std::make_shared(); + graph_node->ns_ = entity.node_namespace(); + graph_node->name_ = entity.node_name(); + graph_node->enclave_ = entity.node_enclave(); + + if (!entity.topic_info().has_value()) { + // Token was for a node. + return graph_node; + } + // Add pub/sub entries. + add_topic_data(entity, *graph_node); - // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. - z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); - auto drop_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); - return false; - } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); - const z_delete_options_t options = z_delete_options_default(); - if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { - RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); - return false; - } + return graph_node; + }; - return true; -} + // Lock the graph mutex before accessing the graph. + std::lock_guard lock(graph_mutex_); -///============================================================================= -namespace -{ -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim = '/') -{ - std::vector delim_idx = {}; - // Insert -1 for starting position to make the split easier when using substr. - delim_idx.push_back(-1); - std::size_t idx = 0; - for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { - delim_idx.push_back(idx); - } - ++idx; - } - std::vector result = {}; - try { - for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const auto & prev_idx = delim_idx.at(i - 1); - const auto & idx = delim_idx.at(i); - result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); - } - } catch (const std::exception & e) { - printf("%s\n", e.what()); - return {}; - } - // Finally add the last substr. - result.push_back(keyexpr.substr(delim_idx.back() + 1)); - return result; -} -///============================================================================= -// Convert a liveliness token into a -std::optional> _parse_token(const std::string & keyexpr) -{ - std::vector parts = split_keyexpr(keyexpr); - // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). - // Basic validation. - if (parts.size() < 5) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - for (const std::string & p : parts) { - if (p.empty()) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } + // If the namespace did not exist, create it and add the node to the graph and return. + NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); + if (ns_it == graph_.end()) { + NodeMap node_map = { + {entity.node_name(), make_graph_node(entity)}}; + graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", + entity.node_name().c_str(), + entity.node_namespace().c_str()); + return; } - // Get the entity, ie NN, MP, MS, SS, SC. - std::string & entity = parts[2]; - - GraphNode node; - // Nodes with empty namespaces will contain a "_". - node.ns = parts[3] == "_" ? "/" : "/" + parts[3]; - node.name = std::move(parts[4]); + // Add the node to the namespace if it did not exist and return. + NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); + if (node_it == ns_it->second.end()) { + ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.", + entity.node_name().c_str(), + entity.node_namespace().c_str()); + return; + } - if (entity != "NN") { - if (parts.size() < 8) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - GraphNode::TopicData data; - data.topic = "/" + std::move(parts[5]); - data.type = std::move(parts[6]); - data.qos = std::move(parts[7]); - - if (entity == "MP") { - node.pubs.push_back(std::move(data)); - } else if (entity == "MS") { - node.subs.push_back(std::move(data)); - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid entity [%s] in liveliness token", entity.c_str()); - return std::nullopt; - } + // Handles additions to an existing node in the graph. + if (entity.type() == EntityType::Node) { + // The NN entity is implicitly handled above where we add the node. + // If control reaches here, then we received a duplicate entry for the same node. + // This could happen when we get() all the liveliness tokens when the node spins up and + // receive a MP token before an NN one. + return; } - return std::make_pair(std::move(entity), std::move(node)); -} -} // namespace + if (!entity.topic_info().has_value()) { + // Likely an error with parsing the token. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info.", + keyexpr.c_str()); + return; + } -///============================================================================= -GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) -: pub_count_(pub_count), - sub_count_(sub_count) -{ - // Do nothing. + // Update the graph based on the entity. + add_topic_data(entity, *(node_it->second)); } ///============================================================================= -void GraphCache::parse_put(const std::string & keyexpr) +void GraphCache::parse_del(const std::string & keyexpr) { - auto valid_token = _parse_token(keyexpr); - if (!valid_token.has_value()) { + std::optional valid_entity = liveliness::Entity::make(keyexpr); + if (!valid_entity.has_value()) { // Error message has already been logged. return; } - const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); - std::lock_guard lock(graph_mutex_); + const liveliness::Entity & entity = *valid_entity; + + // Helper lambda to append pub/subs to the GraphNode. + // We capture by reference to update caches like graph_topics_ if update_cache is true. + auto remove_topic_data = + [&](const Entity & entity, GraphNode & graph_node, + bool update_cache = false) -> void + { + if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { + return; + } - if (entity == "NN") { - // Node - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // New namespace. - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(node->ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second) { + if (!entity.topic_info().has_value()) { + // This should not happen as add_topic_data() is called after validating the existence + // of topic_info. RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.", - node->name.c_str()); + "rmw_zenoh_cpp", + "remove_topic_data() called without valid TopicInfo. Report this."); + return; } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to the graph.", - node->name.c_str()); - return; - } else if (entity == "MP") { - // Publisher - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->pubs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->pubs.push_back(node->pubs[0]); - } else { + const liveliness::TopicInfo topic_info = entity.topic_info().value(); + GraphNode::TopicMap & topic_map = entity.type() == + EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity.type() == + EntityType::Publisher ? "publisher" : "subscription"; + const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; + const std::size_t sub_count = !pub_count; + + GraphNode::TopicMap::iterator topic_it = topic_map.find(topic_info.name_); + if (topic_it == topic_map.end()) { + // Pub/sub not found. return; } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->pub_count_; - } else { - insertion.first->second = std::make_unique(1, 0); - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str()); - return; - } else if (entity == "MS") { - // Subscription - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->subs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->subs.push_back(node->subs[0]); - } else { + + GraphNode::TopicDataMap & topic_data_map = topic_it->second; + // Search the unordered_set for the TopicData for this topic. + GraphNode::TopicDataMap::iterator topic_data_it = + topic_data_map.find(topic_info.type_); + if (topic_data_it == topic_data_map.end()) { + // Something is wrong. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "TopicData not found for topic %s. Report this.", + topic_info.name_.c_str()); return; } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->sub_count_; - } else { - insertion.first->second = std::make_unique(0, 1); - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str()); - return; - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunud): Service Client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity type."); + + // Decrement the relevant counters. If both counters are 0 remove from graph_node. + TopicDataPtr & existing_topic_data = topic_data_it->second; + existing_topic_data->stats_.pub_count_ -= pub_count; + existing_topic_data->stats_.sub_count_ -= sub_count; + if (existing_topic_data->stats_.pub_count_ == 0 && + existing_topic_data->stats_.sub_count_ == 0) + { + topic_data_map.erase(topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (topic_data_map.empty()) { + topic_map.erase(topic_info.name_); + } + + // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. + if (update_cache) { + GraphNode::TopicMap::iterator cache_topic_it = + graph_topics_.find(topic_info.name_); + if (cache_topic_it == graph_topics_.end()) { + // This should not happen. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", + topic_info.name_.c_str()); + } else { + GraphNode::TopicDataMap::iterator cache_topic_data_it = + cache_topic_it->second.find(topic_info.type_); + if (cache_topic_data_it != cache_topic_it->second.end()) { + // Decrement the relevant counters. If both counters are 0 remove from cache. + cache_topic_data_it->second->stats_.pub_count_ -= pub_count; + cache_topic_data_it->second->stats_.sub_count_ -= sub_count; + if (cache_topic_data_it->second->stats_.pub_count_ == 0 && + cache_topic_data_it->second->stats_.sub_count_ == 0) + { + cache_topic_it->second.erase(cache_topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (cache_topic_it->second.empty()) { + graph_topics_.erase(cache_topic_it); + } + } + } + } + + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Removed %s on topic %s with type %s and qos %s to node /%s.", + entity_desc.c_str(), + topic_info.name_.c_str(), + topic_info.type_.c_str(), + topic_info.qos_.c_str(), + graph_node.name_.c_str()); + }; + + // Lock the graph mutex before accessing the graph. + std::lock_guard lock(graph_mutex_); + + // If namespace does not exist, ignore the request. + NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); + if (ns_it == graph_.end()) { return; } -} -///============================================================================= -void GraphCache::parse_del(const std::string & keyexpr) -{ - auto valid_token = _parse_token(keyexpr); - if (!valid_token.has_value()) { - // Error message has already been logged. + // If the node does not exist, ignore the request. + NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); + if (node_it == ns_it->second.end()) { return; } - const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); - std::lock_guard lock(graph_mutex_); - if (entity == "NN") { + + if (entity.type() == EntityType::Node) { // Node - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - ns_it->second.erase(node->name); + // The liveliness tokens to remove pub/subs should be received before the one to remove a node + // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in + // the node below, we should update the count in graph_topics_. + const GraphNodePtr graph_node = node_it->second; + if (!graph_node->pubs_.empty() || !graph_node->subs_.empty()) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token to remove node /%s from the graph before all pub/subs for this " + "node have been removed. Report this issue.", + entity.node_name().c_str() + ); + // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in + // graph_topics_. } + ns_it->second.erase(entity.node_name()); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Removed node /%s from the graph.", - node->name.c_str() + entity.node_name().c_str() ); - } else if (entity == "MP") { - // Publisher - if (node->pubs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of publishers and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->pubs.begin(); - for (; erase_it != found_node->pubs.end(); ++erase_it) { - const auto & pub = *erase_it; - if (pub.topic == node->pubs.at(0).topic && - pub.type == node->pubs.at(0).type && - pub.qos == node->pubs.at(0).qos) - { - break; - } - } - if (erase_it != found_node->pubs.end()) { - found_node->pubs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->pub_count_ == 1 && topic_it->second->sub_count_ == 0) { - // The last publisher was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->pub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed publisher %s from node /%s in the graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } - } else if (entity == "MS") { - // Subscription - if (node->subs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of subscriptions and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->subs.begin(); - for (; erase_it != found_node->subs.end(); ++erase_it) { - const auto & sub = *erase_it; - if (sub.topic == node->subs.at(0).topic && - sub.type == node->subs.at(0).type && - sub.qos == node->subs.at(0).qos) - { - break; - } - } - if (erase_it != found_node->subs.end()) { - found_node->subs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) { - // The last subscription was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->sub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed subscription %s from node /%s in the graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity type."); return; } + + if (!entity.topic_info().has_value()) { + // Likely an error with parsing the token. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.", + keyexpr.c_str()); + return; + } + + // Update the graph based on the entity. + remove_topic_data(entity, *(node_it->second), true); } ///============================================================================= @@ -543,8 +394,8 @@ rmw_ret_t GraphCache::get_node_names( allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); size_t nodes_number = 0; - for (auto it = graph_.begin(); it != graph_.end(); ++it) { - nodes_number += it->second.size(); + for (const std::pair & it : graph_) { + nodes_number += it.second.size(); } rcutils_ret_t rcutils_ret = @@ -600,11 +451,13 @@ rmw_ret_t GraphCache::get_node_names( // Fill node names, namespaces and enclaves. std::size_t j = 0; - for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { + for (NamespaceMap::const_iterator ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { const std::string & ns = ns_it->first; - for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { - const auto node = node_it->second; - node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator); + for (NodeMap::const_iterator node_it = ns_it->second.begin(); node_it != ns_it->second.end(); + ++node_it) + { + const GraphNodePtr node = node_it->second; + node_names->data[j] = rcutils_strdup(node->name_.c_str(), *allocator); if (!node_names->data[j]) { return RMW_RET_BAD_ALLOC; } @@ -615,7 +468,7 @@ rmw_ret_t GraphCache::get_node_names( } if (enclaves) { enclaves->data[j] = rcutils_strdup( - node->enclave.c_str(), *allocator); + node->enclave_.c_str(), *allocator); if (!enclaves->data[j]) { return RMW_RET_BAD_ALLOC; } @@ -664,7 +517,7 @@ _demangle_if_ros_type(const std::string & dds_type_string) rmw_ret_t GraphCache::get_topic_names_and_types( rcutils_allocator_t * allocator, bool no_demangle, - rmw_names_and_types_t * topic_names_and_types) + rmw_names_and_types_t * topic_names_and_types) const { static_cast(no_demangle); RCUTILS_CHECK_ALLOCATOR_WITH_MSG( @@ -686,43 +539,70 @@ rmw_ret_t GraphCache::get_topic_names_and_types( }); // Fill topic names and types. - std::size_t j = 0; - for (const auto & it : graph_topics_) { - // Split based on "?". - // TODO(Yadunund): Be more systematic about this. - // TODO(clalancette): Rather than doing the splitting here, should we store - // it in graph_topics_ already split? - std::vector parts = split_keyexpr(it.first, '?'); - if (parts.size() < 2) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid topic_key %s", it.first.c_str()); - return RMW_RET_INVALID_ARGUMENT; - } - - topic_names_and_types->names.data[j] = rcutils_strdup(parts[0].c_str(), *allocator); - if (!topic_names_and_types->names.data[j]) { + std::size_t index = 0; + for (const std::pair & item : graph_topics_) { + topic_names_and_types->names.data[index] = rcutils_strdup(item.first.c_str(), *allocator); + if (!topic_names_and_types->names.data[index]) { return RMW_RET_BAD_ALLOC; } - - // TODO(clalancette): This won't work if there are multiple types on the same topic - rcutils_ret_t rcutils_ret = rcutils_string_array_init( - &topic_names_and_types->types[j], 1, allocator); - if (RCUTILS_RET_OK != rcutils_ret) { - RMW_SET_ERROR_MSG(rcutils_get_error_string().str); - return RMW_RET_BAD_ALLOC; + { + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &topic_names_and_types->types[index], + item.second.size(), + allocator); + if (RCUTILS_RET_OK != rcutils_ret) { + RMW_SET_ERROR_MSG(rcutils_get_error_string().str); + return RMW_RET_BAD_ALLOC; + } } - - topic_names_and_types->types[j].data[0] = rcutils_strdup( - _demangle_if_ros_type(parts[1]).c_str(), *allocator); - if (!topic_names_and_types->types[j].data[0]) { - return RMW_RET_BAD_ALLOC; + size_t type_index = 0; + for (const std::pair & type : item.second) { + char * type_name = rcutils_strdup(_demangle_if_ros_type(type.first).c_str(), *allocator); + if (!type_name) { + RMW_SET_ERROR_MSG("failed to allocate memory for type name"); + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->types[index].data[type_index] = type_name; + ++type_index; } - - ++j; + ++index; } cleanup_names_and_types.cancel(); return RMW_RET_OK; } + +///============================================================================= +rmw_ret_t GraphCache::count_publishers( + const char * topic_name, + size_t * count) const +{ + *count = 0; + + if (graph_topics_.count(topic_name) != 0) { + for (const std::pair & it : graph_topics_.at(topic_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.pub_count_; + } + } + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t GraphCache::count_subscriptions( + const char * topic_name, + size_t * count) const +{ + *count = 0; + + if (graph_topics_.count(topic_name) != 0) { + for (const std::pair & it : graph_topics_.at(topic_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.sub_count_; + } + } + + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index f2de0018..de78dd96 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -15,15 +15,16 @@ #ifndef DETAIL__GRAPH_CACHE_HPP_ #define DETAIL__GRAPH_CACHE_HPP_ -#include - #include #include #include #include #include +#include #include +#include "liveliness_utils.hpp" + #include "rcutils/allocator.h" #include "rcutils/types.h" @@ -32,66 +33,42 @@ ///============================================================================= -class GenerateToken +struct TopicStats { -public: - static std::string liveliness(size_t domain_id); - - /// Returns a string with key-expression @ros2_lv/domain_id/N/namespace/name - static std::string node( - size_t domain_id, - const std::string & namespace_, - const std::string & name); - - static std::string publisher( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos); - - static std::string subscription( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos); + std::size_t pub_count_; + std::size_t sub_count_; + + // Constructor which initializes counters to 0. + TopicStats(std::size_t pub_count, std::size_t sub_count); }; ///============================================================================= -/// Helper utilities to put/delete tokens until liveliness is supported in the -/// zenoh-c bindings. -class PublishToken +struct TopicData { -public: - static bool put( - z_owned_session_t * session, - const std::string & token); + liveliness::TopicInfo info_; + TopicStats stats_; - static bool del( - z_owned_session_t * session, - const std::string & token); + TopicData( + liveliness::TopicInfo info, + TopicStats stats); }; +using TopicDataPtr = std::shared_ptr; ///============================================================================= // TODO(Yadunund): Expand to services and clients. struct GraphNode { - struct TopicData - { - std::string topic; - std::string type; - std::string qos; - }; - - std::string ns; - std::string name; + std::string ns_; + std::string name_; // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? - std::string enclave; - std::vector pubs; - std::vector subs; + std::string enclave_; + + // Map topic type to TopicData + using TopicDataMap = std::unordered_map; + // Map topic name to TopicDataMap + using TopicMap = std::unordered_map; + TopicMap pubs_ = {}; + TopicMap subs_ = {}; }; using GraphNodePtr = std::shared_ptr; @@ -113,7 +90,15 @@ class GraphCache final rmw_ret_t get_topic_names_and_types( rcutils_allocator_t * allocator, bool no_demangle, - rmw_names_and_types_t * topic_names_and_types); + rmw_names_and_types_t * topic_names_and_types) const; + + rmw_ret_t count_publishers( + const char * topic_name, + size_t * count) const; + + rmw_ret_t count_subscriptions( + const char * topic_name, + size_t * count) const; private: /* @@ -137,20 +122,14 @@ class GraphCache final namespace_2: node_n: */ + + using NodeMap = std::unordered_map; + using NamespaceMap = std::unordered_map; // Map namespace to a map of . - std::unordered_map> graph_ = {}; - - // Optimize topic lookups mapping "topic_name?topic_type" keys to their pub/sub counts. - struct TopicStats - { - std::size_t pub_count_; - std::size_t sub_count_; - - // Constructor which initialized counters to 0. - TopicStats(std::size_t pub_count, std::size_t sub_count); - }; - using TopicStatsPtr = std::unique_ptr; - std::unordered_map graph_topics_ = {}; + NamespaceMap graph_ = {}; + + // Optimize topic lookups across the graph. + GraphNode::TopicMap graph_topics_ = {}; mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp new file mode 100644 index 00000000..204f515d --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -0,0 +1,362 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "liveliness_utils.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "rcpputils/scope_exit.hpp" +#include "rcutils/logging_macros.h" + + +namespace liveliness +{ + +///============================================================================= +NodeInfo::NodeInfo( + std::size_t domain_id, + std::string ns, + std::string name, + std::string enclave) +: domain_id_(std::move(domain_id)), + ns_(std::move(ns)), + name_(std::move(name)), + enclave_(std::move(enclave)) +{ + // Do nothing. +} + +///============================================================================= +TopicInfo::TopicInfo( + std::string name, + std::string type, + std::string qos) +: name_(std::move(name)), + type_(std::move(type)), + qos_(std::move(qos)) +{ + // Do nothing. +} + +///============================================================================= +namespace +{ + +/// The admin space used to prefix the liveliness tokens. +static const char ADMIN_SPACE[] = "@ros2_lv"; +static const char NODE_STR[] = "NN"; +static const char PUB_STR[] = "MP"; +static const char SUB_STR[] = "MS"; +static const char SRV_STR[] = "SS"; +static const char CLI_STR[] = "SC"; + +static const std::unordered_map entity_to_str = { + {EntityType::Node, NODE_STR}, + {EntityType::Publisher, PUB_STR}, + {EntityType::Subscription, SUB_STR}, + {EntityType::Service, SRV_STR}, + {EntityType::Client, CLI_STR} +}; + +static const std::unordered_map str_to_entity = { + {NODE_STR, EntityType::Node}, + {PUB_STR, EntityType::Publisher}, + {SUB_STR, EntityType::Subscription}, + {SRV_STR, EntityType::Service}, + {CLI_STR, EntityType::Client} +}; + +} // namespace + +///============================================================================= +std::string subscription_token(size_t domain_id) +{ + std::string token = std::string(ADMIN_SPACE) + "/" + std::to_string(domain_id) + "/**"; + return token; +} + +///============================================================================= +Entity::Entity( + EntityType type, + NodeInfo node_info, + std::optional topic_info) +: type_(std::move(type)), + node_info_(std::move(node_info)), + topic_info_(std::move(topic_info)) +{ + /** + * Set the liveliness token for the particular entity. + * + * The liveliness token keyexprs are in the form: + * + * //// + * + * Where: + * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. + * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. + * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. + * - The ROS node name for this entity. + * + * For entities with topic infomation, the liveliness token keyexpr have additional fields: + * + * /////// + */ + std::stringstream token_ss; + const std::string & ns = node_info_.ns_; + token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << entity_to_str.at(type_) << ns; + // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". + // Hence we add an "_" to denote an empty namespace such that splitting the key + // will always result in 5 parts. + if (ns == "/") { + token_ss << "_/"; + } else { + token_ss << "/"; + } + // Finally append node name. + token_ss << node_info_.name_; + // If this entity has a topic info, append it to the token. + if (topic_info_.has_value()) { + const auto & topic_info = this->topic_info_.value(); + // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". + token_ss << topic_info.name_ + "/" + topic_info.type_ + "/" + topic_info.qos_; + } + + this->keyexpr_ = token_ss.str(); +} + +///============================================================================= +std::optional Entity::make( + EntityType type, + NodeInfo node_info, + std::optional topic_info) +{ + if (entity_to_str.find(type) == entity_to_str.end()) { + RCUTILS_SET_ERROR_MSG("Invalid entity type."); + return std::nullopt; + } + if (node_info.ns_.empty() || node_info.name_.empty()) { + RCUTILS_SET_ERROR_MSG("Invalid node_info for entity."); + return std::nullopt; + } + if (type != EntityType::Node && !topic_info.has_value()) { + RCUTILS_SET_ERROR_MSG("Invalid topic_info for entity."); + return std::nullopt; + } + + Entity entity{std::move(type), std::move(node_info), std::move(topic_info)}; + return entity; +} + +namespace +{ +///============================================================================= +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') +{ + std::vector delim_idx = {}; + // Insert -1 for starting position to make the split easier when using substr. + delim_idx.push_back(-1); + std::size_t idx = 0; + for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { + if (*it == delim) { + delim_idx.push_back(idx); + } + ++idx; + } + std::vector result = {}; + try { + for (std::size_t i = 1; i < delim_idx.size(); ++i) { + const size_t prev_idx = delim_idx[i - 1]; + const size_t idx = delim_idx[i]; + result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); + } + } catch (const std::exception & e) { + printf("%s\n", e.what()); + return {}; + } + // Finally add the last substr. + result.push_back(keyexpr.substr(delim_idx.back() + 1)); + return result; +} + +} // namespace + +///============================================================================= +std::optional Entity::make(const std::string & keyexpr) +{ + std::vector parts = split_keyexpr(keyexpr); + // A token will contain at least 5 parts: + // (ADMIN_SPACE, domain_id, entity_str, namespace, node_name). + // Basic validation. + if (parts.size() < 5) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + for (const std::string & p : parts) { + if (p.empty()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + } + + if (parts[0] != ADMIN_SPACE) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid admin space."); + return std::nullopt; + } + + // Get the entity, ie NN, MP, MS, SS, SC. + std::string & entity_str = parts[2]; + std::unordered_map::const_iterator entity_it = + str_to_entity.find(entity_str); + if (entity_it == str_to_entity.end()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid entity."); + return std::nullopt; + } + + EntityType entity_type = entity_it->second; + std::size_t domain_id = std::stoul(parts[1]); + std::string ns = parts[3] == "_" ? "/" : "/" + std::move(parts[3]); + std::string node_name = std::move(parts[4]); + std::optional topic_info = std::nullopt; + + // Populate topic_info if we have a token for an entity other than a node. + if (entity_type != EntityType::Node) { + if (parts.size() < 8) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token for non-node entity without required parameters."); + return std::nullopt; + } + topic_info = TopicInfo{ + "/" + std::move(parts[5]), + std::move(parts[6]), + std::move(parts[7])}; + } + + return Entity{ + std::move(entity_type), + NodeInfo{std::move(domain_id), std::move(ns), std::move(node_name), ""}, + std::move(topic_info)}; +} + +///============================================================================= +EntityType Entity::type() const +{ + return this->type_; +} + +std::string Entity::node_namespace() const +{ + return this->node_info_.ns_; +} + +std::string Entity::node_name() const +{ + return this->node_info_.name_; +} + +std::string Entity::node_enclave() const +{ + return this->node_info_.enclave_; +} + +///============================================================================= +std::optional Entity::topic_info() const +{ + return this->topic_info_; +} + +///============================================================================= +std::string Entity::keyexpr() const +{ + return this->keyexpr_; +} + +///============================================================================= +bool PublishToken::put( + z_owned_session_t * session, + const std::string & token) +{ + if (!z_session_check(session)) { + RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); + return false; + } + + // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. + z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); + auto drop_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); + return false; + } + RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); + z_put_options_t options = z_put_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); + if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { + RCUTILS_SET_ERROR_MSG("unable to publish liveliness for node creation"); + return false; + } + + return true; +} + +///============================================================================= +bool PublishToken::del( + z_owned_session_t * session, + const std::string & token) +{ + if (!z_session_check(session)) { + RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); + return false; + } + + // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. + z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); + auto drop_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); + return false; + } + RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); + const z_delete_options_t options = z_delete_options_default(); + if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { + RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); + return false; + } + + return true; +} + +} // namespace liveliness diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp new file mode 100644 index 00000000..90f76e89 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -0,0 +1,133 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__LIVELINESS_UTILS_HPP_ +#define DETAIL__LIVELINESS_UTILS_HPP_ + +#include + +#include +#include +#include + + +namespace liveliness +{ + +///============================================================================= +struct NodeInfo +{ + std::size_t domain_id_; + std::string ns_; + std::string name_; + std::string enclave_; + + NodeInfo( + std::size_t domain_id, + std::string ns, + std::string name, + std::string enclave); +}; + +///============================================================================= +struct TopicInfo +{ + std::string name_; + std::string type_; + std::string qos_; + + TopicInfo( + std::string name, + std::string type, + std::string qos); +}; + +///============================================================================= +/// Retuns the keyexpr for liveliness subscription. +std::string subscription_token(size_t domain_id); + +///============================================================================= +enum class EntityType : uint8_t +{ + Invalid = 0, + Node, + Publisher, + Subscription, + Service, + Client +}; + +///============================================================================= +// An struct to bundle results of parsing a token. +// TODO(Yadunund): Consider using variadic templates to pass args instead of +// relying on optional fields. +class Entity +{ +public: + /// Make an Entity from datatypes. This will return nullopt if the required + /// fields are not present for the EntityType. + // TODO(Yadunund): Find a way to better bundle the type and the associated data. + static std::optional make( + EntityType type, + NodeInfo node_info, + std::optional topic_info = std::nullopt); + + /// Make an Entity from a liveliness keyexpr. + static std::optional make(const std::string & keyexpr); + + /// Get the entity type. + EntityType type() const; + + std::string node_namespace() const; + + std::string node_name() const; + + std::string node_enclave() const; + + /// Get the topic_info. + std::optional topic_info() const; + + /// Get the liveliness keyexpr for this entity. + std::string keyexpr() const; + +private: + Entity( + EntityType type, + NodeInfo node_info, + std::optional topic_info); + + EntityType type_; + NodeInfo node_info_; + std::optional topic_info_; + std::string keyexpr_; +}; + +///============================================================================= +/// Helper utilities to put/delete tokens until liveliness is supported in the +/// zenoh-c bindings. +class PublishToken +{ +public: + static bool put( + z_owned_session_t * session, + const std::string & token); + + static bool del( + z_owned_session_t * session, + const std::string & token); +}; + +} // namespace liveliness + +#endif // DETAIL__LIVELINESS_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 51f22e3c..1e6da94c 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -18,6 +18,7 @@ #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" +#include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "detail/zenoh_router_check.hpp" @@ -246,7 +247,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) }); // Setup liveliness subscriptions for discovery. - const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id); + const std::string liveliness_str = liveliness::subscription_token(context->actual_domain_id); // Query router/liveliness participants to get graph information before this session was started. RCUTILS_LOG_WARN_NAMED( diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4de21163..4fa0ebbb 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -25,6 +25,7 @@ #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" +#include "detail/liveliness_utils.hpp" #include "detail/message_type_support.hpp" #include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" @@ -187,16 +188,25 @@ rmw_create_node( // Publish to the graph that a new node is in town // const bool pub_result = PublishToken::put( // &node->context->impl->session, - // GenerateToken::node(context->actual_domain_id, namespace_, name) + // liveliness::GenerateToken::node(context->actual_domain_id, namespace_, name) // ); // if (!pub_result) { // return nullptr; // } // Initialize liveliness token for the node to advertise that a new node is in town. rmw_node_data_t * node_data = static_cast(node->data); + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Node, + liveliness::NodeInfo{context->actual_domain_id, namespace_, name, ""}); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the node."); + return nullptr; + } node_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -241,7 +251,8 @@ rmw_destroy_node(rmw_node_t * node) // Publish to the graph that a node has ridden off into the sunset // const bool del_result = PublishToken::del( // &node->context->impl->session, - // GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) + // liveliness::GenerateToken::node(node->context->actual_domain_id, node->namespace_, + // node->name) // ); // if (!del_result) { // return RMW_RET_ERROR; @@ -555,7 +566,7 @@ rmw_create_publisher( // TODO(Yadunund): Publish liveliness for the new publisher. // const bool pub_result = PublishToken::put( // &node->context->impl->session, - // GenerateToken::publisher( + // liveliness::GenerateToken::publisher( // node->context->actual_domain_id, // node->namespace_, // node->name, @@ -566,16 +577,21 @@ rmw_create_publisher( // if (!pub_result) { // return nullptr; // } + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Publisher, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_publisher->topic_name, + publisher_data->type_support->get_name(), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the publisher."); + return nullptr; + } publisher_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr( - GenerateToken::publisher( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_publisher->topic_name, - publisher_data->type_support->get_name(), - "reliable").c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -632,7 +648,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // Publish to the graph that a publisher has ridden off into the sunset // const bool del_result = PublishToken::del( // &node->context->impl->session, - // GenerateToken::publisher( + // liveliness::GenerateToken::publisher( // node->context->actual_domain_id, // node->namespace_, // node->name, @@ -1253,16 +1269,21 @@ rmw_create_subscription( }); // Publish to the graph that a new subscription is in town + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Subscription, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_subscription->topic_name, + sub_data->type_support->get_name(), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the subscription."); + return nullptr; + } sub_data->token = zc_liveliness_declare_token( z_loan(context_impl->session), - z_keyexpr( - GenerateToken::subscription( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_subscription->topic_name, - sub_data->type_support->get_name(), - "reliable").c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -2133,10 +2154,26 @@ rmw_count_publishers( const char * topic_name, size_t * count) { - static_cast(node); - static_cast(topic_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_publishers(topic_name, count); } //============================================================================== @@ -2147,10 +2184,26 @@ rmw_count_subscribers( const char * topic_name, size_t * count) { - static_cast(node); - static_cast(topic_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_subscriptions(topic_name, count); } //==============================================================================