From adc0a2f9c424200abf689e1ba93b1264ff786d8a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 23 Nov 2023 14:01:35 +0800 Subject: [PATCH 01/13] use std::find_if to find matching pubs Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 025b0047..8107a373 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -425,16 +426,13 @@ void GraphCache::parse_del(const std::string & keyexpr) // Here we iterate throught the list of publishers and remove the one // with matching name, type and qos. // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->pubs.begin(); - for (; erase_it != found_node->pubs.end(); ++erase_it) { - const auto & pub = *erase_it; - if (pub.topic == node->pubs.at(0).topic && + auto erase_it = std::find_if( + found_node->pubs.begin(), found_node->pubs.end(), + [&node](const auto & pub) { + return pub.topic == node->pubs.at(0).topic && pub.type == node->pubs.at(0).type && - pub.qos == node->pubs.at(0).qos) - { - break; - } - } + pub.qos == node->pubs.at(0).qos; + }); if (erase_it != found_node->pubs.end()) { found_node->pubs.erase(erase_it); // Bookkeeping From d99e776566669b66e8b6a769d84fa67680198cbd Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 24 Nov 2023 17:13:04 +0800 Subject: [PATCH 02/13] Refactor to support multiple types Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 530 +++++++++++++---------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 66 ++- 2 files changed, 355 insertions(+), 241 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 8107a373..739d478e 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -204,9 +204,52 @@ std::vector split_keyexpr( result.push_back(keyexpr.substr(delim_idx.back() + 1)); return result; } + +///============================================================================= +// An internal struct to bundle results of parsing a token. +struct TokenNode +{ + + struct TokenTopicData + { + std::string name_; + std::string type_; + std::string qos_; + + TokenTopicData( + std::string name, + std::string type, + std::string qos) + : name_(std::move(name)), + type_(std::move(type)), + qos_(std::move(qos)) + { + // Do nothing. + } + }; + + std::string ns_; + std::string name_; + std::string enclave_; + std::optional topic_data_; + + TokenNode( + std::string ns, + std::string name, + std::string enclave, + std::optional topic_data = std::nullopt) + : ns_(std::move(ns)), + name_(std::move(name)), + enclave_(std::move(enclave)), + topic_data_(std::move(topic_data)) + { + // Do nothing. + } +}; + ///============================================================================= // Convert a liveliness token into a -std::optional> _parse_token(const std::string & keyexpr) +std::optional> _parse_token(const std::string & keyexpr) { std::vector parts = split_keyexpr(keyexpr); // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). @@ -228,11 +271,20 @@ std::optional> _parse_token(const std::string // Get the entity, ie NN, MP, MS, SS, SC. std::string & entity = parts[2]; + if (entity != "NN" && entity != "MP" && entity != "MS" && entity != "SS" && entity != "SC") { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid entity [%s].", entity.c_str()); + return std::nullopt; + } - GraphNode node; + // TODO(Yadunund): Support enclaves. // Nodes with empty namespaces will contain a "_". - node.ns = parts[3] == "_" ? "/" : "/" + parts[3]; - node.name = std::move(parts[4]); + TokenNode node{ + parts[3] == "_" ? "/" : "/" + parts[3], + std::move(parts[4]), + "" + }; if (entity != "NN") { if (parts.size() < 8) { @@ -241,19 +293,13 @@ std::optional> _parse_token(const std::string "Received invalid liveliness token"); return std::nullopt; } - GraphNode::TopicData data; - data.topic = "/" + std::move(parts[5]); - data.type = std::move(parts[6]); - data.qos = std::move(parts[7]); - - if (entity == "MP") { - node.pubs.push_back(std::move(data)); - } else if (entity == "MS") { - node.subs.push_back(std::move(data)); - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client + if (entity == "MP" || entity == "MS" || entity == "SS" || entity == "SC") { + TokenNode::TokenTopicData topic_data{ + "/" + std::move(parts[5]), + std::move(parts[6]), + std::move(parts[7]) + }; + node.topic_data_ = std::move(topic_data); } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -266,14 +312,26 @@ std::optional> _parse_token(const std::string } } // namespace + ///============================================================================= -GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) +TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) : pub_count_(pub_count), sub_count_(sub_count) { // Do nothing. } + +///============================================================================= +TopicData::TopicData( + std::string type, + std::string qos, + TopicStats stats) +: type_(std::move(type)), + qos_(std::move(qos)), + stats_(std::move(stats)) +{} + ///============================================================================= void GraphCache::parse_put(const std::string & keyexpr) { @@ -283,111 +341,129 @@ void GraphCache::parse_put(const std::string & keyexpr) return; } const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); - std::lock_guard lock(graph_mutex_); + const auto & token_node = valid_token->second; + + // Helper lambda to append pub/subs to the GraphNode. + // We capture by reference to update caches like graph_topics_ if update_cache is true. + auto add_topic_data = + [&](const TokenNode::TokenTopicData & topic_data, GraphNode & graph_node, + const std::string & entity, bool update_cache = false) -> void + { + if (entity != "MP" && entity != "MS") { + return; + } - if (entity == "NN") { - // Node - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // New namespace. - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(node->ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + auto & topic_map = entity == "MP" ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity == "MP" ? "publisher" : "subscription"; + const std::size_t pub_count = entity == "MP" ? 1 : 0; + const std::size_t sub_count = !pub_count; + TopicDataPtr graph_topic_data = std::make_shared( + topic_data.type_, + topic_data.qos_, + TopicStats{pub_count, sub_count}); + + GraphNode::TopicDataSet topic_data_set = {graph_topic_data}; + auto insertion = topic_map.insert(std::make_pair(topic_data.name_, topic_data_set)); if (!insertion.second) { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.", - node->name.c_str()); + // A topic with the same name already exists in the node so we append the type. + auto type_insertion = insertion.first->second.insert(graph_topic_data); + if (!type_insertion.second) { + // We have another instance of a pub/sub over the same topic and type so we increment the counters. + auto & existing_graph_topic = *(type_insertion.first); + existing_graph_topic->stats_.pub_count_ += pub_count; + existing_graph_topic->stats_.pub_count_ += sub_count; + } } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to the graph.", - node->name.c_str()); - return; - } else if (entity == "MP") { - // Publisher - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->pubs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->pubs.push_back(node->pubs[0]); - } else { - return; + // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. + if (update_cache) { + std::string topic_key = topic_data.name_ + "?" + topic_data.type_; + auto cache_insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); + if (cache_insertion.second) { + // First entry for this topic name + type combo so we create a new TopicStats instance. + cache_insertion.first->second = std::make_unique(pub_count, sub_count); + } else { + // Else we update the existing counters. + cache_insertion.first->second->pub_count_ += pub_count; + cache_insertion.first->second->sub_count_ += sub_count; + } } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->pub_count_; - } else { - insertion.first->second = std::make_unique(1, 0); - } + + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Added %s on topic %s with type %s and qos %s to node /%s.", + entity_desc.c_str(), + topic_data.name_.c_str(), + topic_data.type_.c_str(), + topic_data.qos_.c_str(), + graph_node.name_.c_str()); + }; + + // Helper lambdas to convert a TokenNode into a basic GraphNode. + auto make_graph_node = + [&](const TokenNode & token_node, const std::string & entity) -> std::shared_ptr + { + auto graph_node = std::make_shared(); + graph_node->ns_ = token_node.ns_; + graph_node->name_ = token_node.name_; + graph_node->enclave_ = token_node.enclave_; + + if (!token_node.topic_data_.has_value()) { + // Token was for a node. + return graph_node; + } + // Add pub/sub entries. + add_topic_data(token_node.topic_data_.value(), *graph_node, entity, true); + + return graph_node; + }; + + // Lock the graph mutex before accessing the graph. + std::lock_guard lock(graph_mutex_); + + // If the namespace did not exist, create it and add the node to the graph and return. + auto ns_it = graph_.find(token_node.ns_); + if (ns_it == graph_.end()) { + std::unordered_map node_map = { + {token_node.name_, make_graph_node(token_node, entity)}}; + graph_.insert(std::make_pair(token_node.ns_, std::move(node_map))); RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str()); + "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", + token_node.name_.c_str(), + token_node.ns_.c_str()); return; - } else if (entity == "MS") { - // Subscription - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->subs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->subs.push_back(node->subs[0]); - } else { - return; - } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->sub_count_; - } else { - insertion.first->second = std::make_unique(0, 1); - } + } + + // Add the node to the namespace if it did not exist and return. + auto node_it = ns_it->second.find(token_node.name_); + if (node_it == ns_it->second.end()) { + ns_it->second.insert(std::make_pair(token_node.name_, make_graph_node(token_node, entity))); RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str()); + "rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.", + token_node.name_.c_str(), + token_node.ns_.c_str()); return; - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunud): Service Client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity type."); + } + + // Handles additions to an existing node in the graph. + if (entity == "NN") { + // The NN entity is implicitly handled above where we add the node. + // If control reaches here, then we received a duplicate entry for the same node. + // This could happen when we get() all the liveliness tokens when the node spins up and + // receive a MP token before an NN one. + return; + } + + if (!token_node.topic_data_.has_value()) { + // Likely an error with parsing the token. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Put token %s parsed without extracting TopicData.", + keyexpr.c_str()); return; } + + // Update the graph based on the entity. + add_topic_data(token_node.topic_data_.value(), *(node_it->second), entity, true); } ///============================================================================= @@ -399,122 +475,136 @@ void GraphCache::parse_del(const std::string & keyexpr) return; } const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); + const auto & token_node = valid_token->second; + + // Helper lambda to append pub/subs to the GraphNode. + // We capture by reference to update caches like graph_topics_ if update_cache is true. + auto remove_topic_data = + [&](const TokenNode::TokenTopicData & topic_data, GraphNode & graph_node, + const std::string & entity, bool update_cache = false) -> void + { + if (entity != "MP" && entity != "MS") { + return; + } + + auto & topic_map = entity == "MP" ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity == "MP" ? "publisher" : "subscription"; + const std::size_t pub_count = entity == "MP" ? 1 : 0; + const std::size_t sub_count = !pub_count; + + auto topic_it = topic_map.find(topic_data.name_); + if (topic_it == topic_map.end()) { + // Pub/sub not found. + return; + } + + auto & topic_data_set = topic_it->second; + // Search the unordered_set for the TopicData for this topic. + auto topic_data_it = std::find_if( + topic_data_set.begin(), topic_data_set.end(), + [&topic_data](const auto topic_data_ptr) { + return topic_data.type_ == topic_data_ptr->type_; + }); + if (topic_data_it == topic_data_set.end()) { + // Something is wrong. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "TopicData not found for topic %s. Report this.", + topic_data.name_.c_str()); + return; + } + + // Decrement the relevant counters. Check if both counters are 0 and if so remove from graph_node. + auto & existing_topic_data = *topic_data_it; + existing_topic_data->stats_.pub_count_ -= pub_count; + existing_topic_data->stats_.sub_count_ -= sub_count; + if (existing_topic_data->stats_.pub_count_ == 0 && + existing_topic_data->stats_.sub_count_ == 0) + { + topic_data_set.erase(topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (topic_data_set.empty()) { + topic_map.erase(topic_data.name_); + } + + // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. + if (update_cache) { + std::string topic_key = topic_data.name_ + "?" + topic_data.type_; + auto cache_topic_it = graph_topics_.find(topic_key); + if (cache_topic_it == graph_topics_.end()) { + // This should not happen. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", + topic_key.c_str()); + } else { + // Decrement the relevant counters. Check if both counters are 0 and if so remove from cache. + cache_topic_it->second->pub_count_ -= pub_count; + cache_topic_it->second->sub_count_ -= sub_count; + if (cache_topic_it->second->pub_count_ == 0 && cache_topic_it->second->sub_count_ == 0) { + graph_topics_.erase(topic_key); + } + } + } + + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Removed %s on topic %s with type %s and qos %s to node /%s.", + entity_desc.c_str(), + topic_data.name_.c_str(), + topic_data.type_.c_str(), + topic_data.qos_.c_str(), + graph_node.name_.c_str()); + }; + + // Lock the graph mutex before accessing the graph. std::lock_guard lock(graph_mutex_); + + // If namespace does not exist, ignore the request. + auto ns_it = graph_.find(token_node.ns_); + if (ns_it == graph_.end()) { + return; + } + + // If the node does not exist, ignore the request. + auto node_it = ns_it->second.find(token_node.name_); + if (node_it == ns_it->second.end()) { + return; + } + if (entity == "NN") { // Node - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - ns_it->second.erase(node->name); + // The liveliness tokens to remove pub/subs should be received before the one to remove a node + // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in the node below, + // we should update the count in graph_topics_. + const auto graph_node = node_it->second; + if (!graph_node->pubs_.empty() || !graph_node->subs_.empty()) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token to remove node /%s from the graph before all pub/subs for this node have been removed. " + "Report this issue.", + token_node.name_.c_str() + ); + // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in graph_topics_. } + ns_it->second.erase(token_node.name_); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Removed node /%s from the graph.", - node->name.c_str() + token_node.name_.c_str() ); - } else if (entity == "MP") { - // Publisher - if (node->pubs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of publishers and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = std::find_if( - found_node->pubs.begin(), found_node->pubs.end(), - [&node](const auto & pub) { - return pub.topic == node->pubs.at(0).topic && - pub.type == node->pubs.at(0).type && - pub.qos == node->pubs.at(0).qos; - }); - if (erase_it != found_node->pubs.end()) { - found_node->pubs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->pub_count_ == 1 && topic_it->second->sub_count_ == 0) { - // The last publisher was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->pub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed publisher %s from node /%s in the graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } - } else if (entity == "MS") { - // Subscription - if (node->subs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of subscriptions and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->subs.begin(); - for (; erase_it != found_node->subs.end(); ++erase_it) { - const auto & sub = *erase_it; - if (sub.topic == node->subs.at(0).topic && - sub.type == node->subs.at(0).type && - sub.qos == node->subs.at(0).qos) - { - break; - } - } - if (erase_it != found_node->subs.end()) { - found_node->subs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) { - // The last subscription was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->sub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed subscription %s from node /%s in the graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity type."); return; } + + if (!token_node.topic_data_.has_value()) { + // Likely an error with parsing the token. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.", + keyexpr.c_str()); + return; + } + + // Update the graph based on the entity. + remove_topic_data(token_node.topic_data_.value(), *(node_it->second), entity, true); } ///============================================================================= @@ -602,7 +692,7 @@ rmw_ret_t GraphCache::get_node_names( const std::string & ns = ns_it->first; for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { const auto node = node_it->second; - node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator); + node_names->data[j] = rcutils_strdup(node->name_.c_str(), *allocator); if (!node_names->data[j]) { return RMW_RET_BAD_ALLOC; } @@ -613,7 +703,7 @@ rmw_ret_t GraphCache::get_node_names( } if (enclaves) { enclaves->data[j] = rcutils_strdup( - node->enclave.c_str(), *allocator); + node->enclave_.c_str(), *allocator); if (!enclaves->data[j]) { return RMW_RET_BAD_ALLOC; } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index f2de0018..b651a014 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "rcutils/allocator.h" @@ -75,23 +76,54 @@ class PublishToken const std::string & token); }; +///============================================================================= +struct TopicStats +{ + std::size_t pub_count_; + std::size_t sub_count_; + + // Constructor which initializes counters to 0. + TopicStats(std::size_t pub_count, std::size_t sub_count); +}; +using TopicStatsPtr = std::unique_ptr; + +///============================================================================= +struct TopicData +{ + std::string type_; + std::string qos_; + TopicStats stats_; + + TopicData( + std::string type, + std::string qos, + TopicStats stats); +}; +using TopicDataPtr = std::shared_ptr; + ///============================================================================= // TODO(Yadunund): Expand to services and clients. struct GraphNode { - struct TopicData + std::string ns_; + std::string name_; + // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? + std::string enclave_; + + // Hash topic data using "type" string to only support unique topic_types. + // TODO(Yadunund): Should we also factor the "qos" into the cache? + struct TopicDataHash { - std::string topic; - std::string type; - std::string qos; + std::size_t operator()(const TopicDataPtr & data) const + { + return std::hash{}(data->type_); + } }; - - std::string ns; - std::string name; - // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? - std::string enclave; - std::vector pubs; - std::vector subs; + // Map topic name to a set of TopicData to support multiple types per topic. + using TopicDataSet = std::unordered_set; + using TopicMap = std::unordered_map; + TopicMap pubs_ = {}; + TopicMap subs_ = {}; }; using GraphNodePtr = std::shared_ptr; @@ -140,16 +172,8 @@ class GraphCache final // Map namespace to a map of . std::unordered_map> graph_ = {}; - // Optimize topic lookups mapping "topic_name?topic_type" keys to their pub/sub counts. - struct TopicStats - { - std::size_t pub_count_; - std::size_t sub_count_; - - // Constructor which initialized counters to 0. - TopicStats(std::size_t pub_count, std::size_t sub_count); - }; - using TopicStatsPtr = std::unique_ptr; + // Optimize topic lookups across the graph by mapping "topic_name?topic_type" keys to their pub/sub counts. + // TODO(Yadunund): Consider storing a set of NodePtrs for each key. std::unordered_map graph_topics_ = {}; mutable std::mutex graph_mutex_; From 208f3ef67d17726f3bb69b4d44e725b42d2a5448 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 27 Nov 2023 16:17:42 +0530 Subject: [PATCH 03/13] Refactor liveliness token generation Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 1 + rmw_zenoh_cpp/src/detail/graph_cache.cpp | 419 +++--------------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 58 +-- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 342 ++++++++++++++ rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 137 ++++++ rmw_zenoh_cpp/src/rmw_init.cpp | 3 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 62 ++- 7 files changed, 599 insertions(+), 423 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/liveliness_utils.cpp create mode 100644 rmw_zenoh_cpp/src/detail/liveliness_utils.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d3f93e72..24b70723 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -28,6 +28,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/identifier.cpp src/detail/graph_cache.cpp src/detail/guard_condition.cpp + src/detail/liveliness_utils.cpp src/detail/message_type_support.cpp src/detail/rmw_data_types.cpp src/detail/service_type_support.cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 739d478e..3710ac77 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -33,285 +33,8 @@ #include "graph_cache.hpp" ///============================================================================= -std::string GenerateToken::liveliness(size_t domain_id) -{ - std::string token = "@ros2_lv/" + std::to_string(domain_id) + "/**"; - return token; -} - -///============================================================================= -/** - * Generate a liveliness token for the particular entity. - * - * The liveliness tokens are in the form: - * - * @ros2_lv//// - * - * Where: - * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. - * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. - * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. - * - The ROS node name for this entity. - */ -static std::string generate_base_token( - const std::string & entity, - size_t domain_id, - const std::string & namespace_, - const std::string & name) -{ - std::stringstream token_ss; - token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_; - // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". - // Hence we add an "_" to denote an empty namespace such that splitting the key - // will always result in 5 parts. - if (namespace_ == "/") { - token_ss << "_/"; - } else { - token_ss << "/"; - } - // Finally append node name. - token_ss << name; - return token_ss.str(); -} - -///============================================================================= -std::string GenerateToken::node( - size_t domain_id, - const std::string & namespace_, - const std::string & name) -{ - return generate_base_token("NN", domain_id, namespace_, name); -} - -///============================================================================= -std::string GenerateToken::publisher( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos) -{ - std::string token = generate_base_token("MP", domain_id, node_namespace, node_name); - token += topic + "/" + type + "/" + qos; - return token; -} - -///============================================================================= -std::string GenerateToken::subscription( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos) -{ - std::string token = generate_base_token("MS", domain_id, node_namespace, node_name); - token += topic + "/" + type + "/" + qos; - return token; -} - -///============================================================================= -bool PublishToken::put( - z_owned_session_t * session, - const std::string & token) -{ - if (!z_session_check(session)) { - RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); - return false; - } - - // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. - z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); - auto drop_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); - return false; - } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); - z_put_options_t options = z_put_options_default(); - options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { - RCUTILS_SET_ERROR_MSG("unable to publish liveliness for node creation"); - return false; - } - - return true; -} - -///============================================================================= -bool PublishToken::del( - z_owned_session_t * session, - const std::string & token) -{ - if (!z_session_check(session)) { - RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); - return false; - } - - // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. - z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); - auto drop_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); - return false; - } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); - const z_delete_options_t options = z_delete_options_default(); - if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { - RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); - return false; - } - - return true; -} - -///============================================================================= -namespace -{ -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim = '/') -{ - std::vector delim_idx = {}; - // Insert -1 for starting position to make the split easier when using substr. - delim_idx.push_back(-1); - std::size_t idx = 0; - for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { - delim_idx.push_back(idx); - } - ++idx; - } - std::vector result = {}; - try { - for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const auto & prev_idx = delim_idx.at(i - 1); - const auto & idx = delim_idx.at(i); - result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); - } - } catch (const std::exception & e) { - printf("%s\n", e.what()); - return {}; - } - // Finally add the last substr. - result.push_back(keyexpr.substr(delim_idx.back() + 1)); - return result; -} - -///============================================================================= -// An internal struct to bundle results of parsing a token. -struct TokenNode -{ - - struct TokenTopicData - { - std::string name_; - std::string type_; - std::string qos_; - - TokenTopicData( - std::string name, - std::string type, - std::string qos) - : name_(std::move(name)), - type_(std::move(type)), - qos_(std::move(qos)) - { - // Do nothing. - } - }; - - std::string ns_; - std::string name_; - std::string enclave_; - std::optional topic_data_; - - TokenNode( - std::string ns, - std::string name, - std::string enclave, - std::optional topic_data = std::nullopt) - : ns_(std::move(ns)), - name_(std::move(name)), - enclave_(std::move(enclave)), - topic_data_(std::move(topic_data)) - { - // Do nothing. - } -}; - -///============================================================================= -// Convert a liveliness token into a -std::optional> _parse_token(const std::string & keyexpr) -{ - std::vector parts = split_keyexpr(keyexpr); - // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). - // Basic validation. - if (parts.size() < 5) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - for (const std::string & p : parts) { - if (p.empty()) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - } - - // Get the entity, ie NN, MP, MS, SS, SC. - std::string & entity = parts[2]; - if (entity != "NN" && entity != "MP" && entity != "MS" && entity != "SS" && entity != "SC") { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity [%s].", entity.c_str()); - return std::nullopt; - } - - // TODO(Yadunund): Support enclaves. - // Nodes with empty namespaces will contain a "_". - TokenNode node{ - parts[3] == "_" ? "/" : "/" + parts[3], - std::move(parts[4]), - "" - }; - - if (entity != "NN") { - if (parts.size() < 8) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - if (entity == "MP" || entity == "MS" || entity == "SS" || entity == "SC") { - TokenNode::TokenTopicData topic_data{ - "/" + std::move(parts[5]), - std::move(parts[6]), - std::move(parts[7]) - }; - node.topic_data_ = std::move(topic_data); - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid entity [%s] in liveliness token", entity.c_str()); - return std::nullopt; - } - } - - return std::make_pair(std::move(entity), std::move(node)); -} -} // namespace - +using Entity = liveliness::Entity; +using EntityType = liveliness::EntityType; ///============================================================================= TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) @@ -321,49 +44,46 @@ TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) // Do nothing. } - ///============================================================================= TopicData::TopicData( - std::string type, - std::string qos, + TopicInfo info, TopicStats stats) -: type_(std::move(type)), - qos_(std::move(qos)), +: info_(std::move(info)), stats_(std::move(stats)) {} ///============================================================================= void GraphCache::parse_put(const std::string & keyexpr) { - auto valid_token = _parse_token(keyexpr); - if (!valid_token.has_value()) { + auto valid_entity = liveliness::Entity::make(keyexpr); + if (!valid_entity.has_value()) { // Error message has already been logged. return; } - const std::string & entity = valid_token->first; - const auto & token_node = valid_token->second; + const auto & entity = *valid_entity; // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update caches like graph_topics_ if update_cache is true. auto add_topic_data = - [&](const TokenNode::TokenTopicData & topic_data, GraphNode & graph_node, - const std::string & entity, bool update_cache = false) -> void + [&](const Entity & entity, GraphNode & graph_node, + bool update_cache = false) -> void { - if (entity != "MP" && entity != "MS") { + if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { return; } - auto & topic_map = entity == "MP" ? graph_node.pubs_ : graph_node.subs_; - const std::string entity_desc = entity == "MP" ? "publisher" : "subscription"; - const std::size_t pub_count = entity == "MP" ? 1 : 0; + auto & topic_map = entity.type() == + EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity.type() == + EntityType::Publisher ? "publisher" : "subscription"; + const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; const std::size_t sub_count = !pub_count; TopicDataPtr graph_topic_data = std::make_shared( - topic_data.type_, - topic_data.qos_, + entity.topic_info().value(), TopicStats{pub_count, sub_count}); GraphNode::TopicDataSet topic_data_set = {graph_topic_data}; - auto insertion = topic_map.insert(std::make_pair(topic_data.name_, topic_data_set)); + auto insertion = topic_map.insert(std::make_pair(entity.topic_info()->name_, topic_data_set)); if (!insertion.second) { // A topic with the same name already exists in the node so we append the type. auto type_insertion = insertion.first->second.insert(graph_topic_data); @@ -377,7 +97,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. if (update_cache) { - std::string topic_key = topic_data.name_ + "?" + topic_data.type_; + std::string topic_key = entity.topic_info()->name_ + "?" + entity.topic_info()->type_; auto cache_insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); if (cache_insertion.second) { // First entry for this topic name + type combo so we create a new TopicStats instance. @@ -393,27 +113,27 @@ void GraphCache::parse_put(const std::string & keyexpr) "rmw_zenoh_cpp", "Added %s on topic %s with type %s and qos %s to node /%s.", entity_desc.c_str(), - topic_data.name_.c_str(), - topic_data.type_.c_str(), - topic_data.qos_.c_str(), + entity.topic_info()->name_.c_str(), + entity.topic_info()->type_.c_str(), + entity.topic_info()->qos_.c_str(), graph_node.name_.c_str()); }; - // Helper lambdas to convert a TokenNode into a basic GraphNode. + // Helper lambdas to convert an Entity into a GraphNode. auto make_graph_node = - [&](const TokenNode & token_node, const std::string & entity) -> std::shared_ptr + [&](const Entity & entity) -> std::shared_ptr { auto graph_node = std::make_shared(); - graph_node->ns_ = token_node.ns_; - graph_node->name_ = token_node.name_; - graph_node->enclave_ = token_node.enclave_; + graph_node->ns_ = entity.node_info().ns_; + graph_node->name_ = entity.node_info().name_; + graph_node->enclave_ = entity.node_info().enclave_; - if (!token_node.topic_data_.has_value()) { + if (!entity.topic_info().has_value()) { // Token was for a node. return graph_node; } // Add pub/sub entries. - add_topic_data(token_node.topic_data_.value(), *graph_node, entity, true); + add_topic_data(entity, *graph_node, true); return graph_node; }; @@ -422,31 +142,31 @@ void GraphCache::parse_put(const std::string & keyexpr) std::lock_guard lock(graph_mutex_); // If the namespace did not exist, create it and add the node to the graph and return. - auto ns_it = graph_.find(token_node.ns_); + auto ns_it = graph_.find(entity.node_info().ns_); if (ns_it == graph_.end()) { std::unordered_map node_map = { - {token_node.name_, make_graph_node(token_node, entity)}}; - graph_.insert(std::make_pair(token_node.ns_, std::move(node_map))); + {entity.node_info().name_, make_graph_node(entity)}}; + graph_.insert(std::make_pair(entity.node_info().ns_, std::move(node_map))); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", - token_node.name_.c_str(), - token_node.ns_.c_str()); + entity.node_info().name_.c_str(), + entity.node_info().ns_.c_str()); return; } // Add the node to the namespace if it did not exist and return. - auto node_it = ns_it->second.find(token_node.name_); + auto node_it = ns_it->second.find(entity.node_info().name_); if (node_it == ns_it->second.end()) { - ns_it->second.insert(std::make_pair(token_node.name_, make_graph_node(token_node, entity))); + ns_it->second.insert(std::make_pair(entity.node_info().name_, make_graph_node(entity))); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.", - token_node.name_.c_str(), - token_node.ns_.c_str()); + entity.node_info().name_.c_str(), + entity.node_info().ns_.c_str()); return; } // Handles additions to an existing node in the graph. - if (entity == "NN") { + if (entity.type() == EntityType::Node) { // The NN entity is implicitly handled above where we add the node. // If control reaches here, then we received a duplicate entry for the same node. // This could happen when we get() all the liveliness tokens when the node spins up and @@ -454,45 +174,46 @@ void GraphCache::parse_put(const std::string & keyexpr) return; } - if (!token_node.topic_data_.has_value()) { + if (!entity.topic_info().has_value()) { // Likely an error with parsing the token. RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Put token %s parsed without extracting TopicData.", + "rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info.", keyexpr.c_str()); return; } // Update the graph based on the entity. - add_topic_data(token_node.topic_data_.value(), *(node_it->second), entity, true); + add_topic_data(entity, *(node_it->second), true); } ///============================================================================= void GraphCache::parse_del(const std::string & keyexpr) { - auto valid_token = _parse_token(keyexpr); - if (!valid_token.has_value()) { + auto valid_entity = liveliness::Entity::make(keyexpr); + if (!valid_entity.has_value()) { // Error message has already been logged. return; } - const std::string & entity = valid_token->first; - const auto & token_node = valid_token->second; + const auto & entity = *valid_entity; // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update caches like graph_topics_ if update_cache is true. auto remove_topic_data = - [&](const TokenNode::TokenTopicData & topic_data, GraphNode & graph_node, - const std::string & entity, bool update_cache = false) -> void + [&](const Entity & entity, GraphNode & graph_node, + bool update_cache = false) -> void { - if (entity != "MP" && entity != "MS") { + if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { return; } - auto & topic_map = entity == "MP" ? graph_node.pubs_ : graph_node.subs_; - const std::string entity_desc = entity == "MP" ? "publisher" : "subscription"; - const std::size_t pub_count = entity == "MP" ? 1 : 0; + auto & topic_map = entity.type() == + EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity.type() == + EntityType::Publisher ? "publisher" : "subscription"; + const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; const std::size_t sub_count = !pub_count; - auto topic_it = topic_map.find(topic_data.name_); + auto topic_it = topic_map.find(entity.topic_info()->name_); if (topic_it == topic_map.end()) { // Pub/sub not found. return; @@ -502,14 +223,14 @@ void GraphCache::parse_del(const std::string & keyexpr) // Search the unordered_set for the TopicData for this topic. auto topic_data_it = std::find_if( topic_data_set.begin(), topic_data_set.end(), - [&topic_data](const auto topic_data_ptr) { - return topic_data.type_ == topic_data_ptr->type_; + [&entity](const auto topic_data_ptr) { + return entity.topic_info()->type_ == topic_data_ptr->info_.type_; }); if (topic_data_it == topic_data_set.end()) { // Something is wrong. RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "TopicData not found for topic %s. Report this.", - topic_data.name_.c_str()); + entity.topic_info()->name_.c_str()); return; } @@ -524,12 +245,12 @@ void GraphCache::parse_del(const std::string & keyexpr) } // If the topic does not have any TopicData entries, erase the topic from the map. if (topic_data_set.empty()) { - topic_map.erase(topic_data.name_); + topic_map.erase(entity.topic_info()->name_); } // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. if (update_cache) { - std::string topic_key = topic_data.name_ + "?" + topic_data.type_; + std::string topic_key = entity.topic_info()->name_ + "?" + entity.topic_info()->type_; auto cache_topic_it = graph_topics_.find(topic_key); if (cache_topic_it == graph_topics_.end()) { // This should not happen. @@ -550,9 +271,9 @@ void GraphCache::parse_del(const std::string & keyexpr) "rmw_zenoh_cpp", "Removed %s on topic %s with type %s and qos %s to node /%s.", entity_desc.c_str(), - topic_data.name_.c_str(), - topic_data.type_.c_str(), - topic_data.qos_.c_str(), + entity.topic_info()->name_.c_str(), + entity.topic_info()->type_.c_str(), + entity.topic_info()->qos_.c_str(), graph_node.name_.c_str()); }; @@ -560,18 +281,18 @@ void GraphCache::parse_del(const std::string & keyexpr) std::lock_guard lock(graph_mutex_); // If namespace does not exist, ignore the request. - auto ns_it = graph_.find(token_node.ns_); + auto ns_it = graph_.find(entity.node_info().ns_); if (ns_it == graph_.end()) { return; } // If the node does not exist, ignore the request. - auto node_it = ns_it->second.find(token_node.name_); + auto node_it = ns_it->second.find(entity.node_info().name_); if (node_it == ns_it->second.end()) { return; } - if (entity == "NN") { + if (entity.type() == EntityType::Node) { // Node // The liveliness tokens to remove pub/subs should be received before the one to remove a node // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in the node below, @@ -582,20 +303,20 @@ void GraphCache::parse_del(const std::string & keyexpr) "rmw_zenoh_cpp", "Received liveliness token to remove node /%s from the graph before all pub/subs for this node have been removed. " "Report this issue.", - token_node.name_.c_str() + entity.node_info().name_.c_str() ); // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in graph_topics_. } - ns_it->second.erase(token_node.name_); + ns_it->second.erase(entity.node_info().name_); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Removed node /%s from the graph.", - token_node.name_.c_str() + entity.node_info().name_.c_str() ); return; } - if (!token_node.topic_data_.has_value()) { + if (!entity.topic_info().has_value()) { // Likely an error with parsing the token. RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.", @@ -604,7 +325,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Update the graph based on the entity. - remove_topic_data(token_node.topic_data_.value(), *(node_it->second), entity, true); + remove_topic_data(entity, *(node_it->second), true); } ///============================================================================= @@ -780,7 +501,7 @@ rmw_ret_t GraphCache::get_topic_names_and_types( // TODO(Yadunund): Be more systematic about this. // TODO(clalancette): Rather than doing the splitting here, should we store // it in graph_topics_ already split? - std::vector parts = split_keyexpr(it.first, '?'); + std::vector parts = liveliness::split_keyexpr(it.first, '?'); if (parts.size() < 2) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index b651a014..e4b94b1e 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -15,8 +15,6 @@ #ifndef DETAIL__GRAPH_CACHE_HPP_ #define DETAIL__GRAPH_CACHE_HPP_ -#include - #include #include #include @@ -25,6 +23,8 @@ #include #include +#include "liveliness_utils.hpp" + #include "rcutils/allocator.h" #include "rcutils/types.h" @@ -32,50 +32,6 @@ #include "rmw/names_and_types.h" -///============================================================================= -class GenerateToken -{ -public: - static std::string liveliness(size_t domain_id); - - /// Returns a string with key-expression @ros2_lv/domain_id/N/namespace/name - static std::string node( - size_t domain_id, - const std::string & namespace_, - const std::string & name); - - static std::string publisher( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos); - - static std::string subscription( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos); -}; - -///============================================================================= -/// Helper utilities to put/delete tokens until liveliness is supported in the -/// zenoh-c bindings. -class PublishToken -{ -public: - static bool put( - z_owned_session_t * session, - const std::string & token); - - static bool del( - z_owned_session_t * session, - const std::string & token); -}; - ///============================================================================= struct TopicStats { @@ -86,17 +42,15 @@ struct TopicStats TopicStats(std::size_t pub_count, std::size_t sub_count); }; using TopicStatsPtr = std::unique_ptr; - +using TopicInfo = liveliness::TopicInfo; ///============================================================================= struct TopicData { - std::string type_; - std::string qos_; + TopicInfo info_; TopicStats stats_; TopicData( - std::string type, - std::string qos, + TopicInfo info, TopicStats stats); }; using TopicDataPtr = std::shared_ptr; @@ -116,7 +70,7 @@ struct GraphNode { std::size_t operator()(const TopicDataPtr & data) const { - return std::hash{}(data->type_); + return std::hash{}(data->info_.type_); } }; // Map topic name to a set of TopicData to support multiple types per topic. diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp new file mode 100644 index 00000000..316cd1a6 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -0,0 +1,342 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "liveliness_utils.hpp" + +#include + +#include "rcpputils/scope_exit.hpp" +#include "rcutils/logging_macros.h" + + +namespace liveliness +{ + +///============================================================================= +NodeInfo::NodeInfo( + std::size_t domain_id, + std::string ns, + std::string name, + std::string enclave) +: domain_id_(std::move(domain_id)), + ns_(std::move(ns)), + name_(std::move(name)), + enclave_(std::move(enclave)) +{ + // Do nothing. +} + +///============================================================================= +TopicInfo::TopicInfo( + std::string name, + std::string type, + std::string qos) +: name_(std::move(name)), + type_(std::move(type)), + qos_(std::move(qos)) +{ + // Do nothing. +} + +///============================================================================= +namespace +{ + +/// The admin space used to prefix the liveliness tokens. +static const std::string ADMIN_SPACE = "@ros2_lv"; +static const std::string NODE_STR = "NN"; +static const std::string PUB_STR = "MP"; +static const std::string SUB_STR = "MS"; +static const std::string SRV_STR = "SS"; +static const std::string CLI_STR = "SC"; + +static const std::unordered_map entity_to_str = { + {EntityType::Node, NODE_STR}, + {EntityType::Publisher, PUB_STR}, + {EntityType::Subscription, SUB_STR}, + {EntityType::Service, SRV_STR}, + {EntityType::Client, CLI_STR} +}; + +static const std::unordered_map str_to_entity = { + {NODE_STR, EntityType::Node}, + {PUB_STR, EntityType::Publisher}, + {SUB_STR, EntityType::Subscription}, + {SRV_STR, EntityType::Service}, + {CLI_STR, EntityType::Client} +}; + +} // namespace + +///============================================================================= +std::string subscription_token(size_t domain_id) +{ + std::string token = ADMIN_SPACE + "/" + std::to_string(domain_id) + "/**"; + return token; +} + +///============================================================================= +Entity::Entity( + EntityType type, + NodeInfo node_info, + std::optional topic_info) +: type_(std::move(type)), + node_info_(std::move(node_info)), + topic_info_(std::move(topic_info)) +{ + /** + * Set the liveliness token for the particular entity. + * + * The liveliness token keyexprs are in the form: + * + * //// + * + * Where: + * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. + * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. + * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. + * - The ROS node name for this entity. + * + * For entities with topic infomation, the liveliness token keyexpr have additional fields: + * + * /////// + */ + std::stringstream token_ss; + const auto & ns = node_info_.ns_; + token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << entity_to_str.at(type_) << ns; + // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". + // Hence we add an "_" to denote an empty namespace such that splitting the key + // will always result in 5 parts. + if (ns == "/") { + token_ss << "_/"; + } else { + token_ss << "/"; + } + // Finally append node name. + token_ss << node_info_.name_; + // If this entity has a topic info, append it to the token. + if (topic_info_.has_value()) { + const auto & topic_info = this->topic_info_.value(); + // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". + token_ss << topic_info.name_ + "/" + topic_info.type_ + "/" + topic_info.qos_; + } + + this->keyexpr_ = token_ss.str(); +} + +///============================================================================= +std::optional Entity::make( + EntityType type, + NodeInfo node_info, + std::optional topic_info) +{ + if (entity_to_str.find(type) == entity_to_str.end()) { + RCUTILS_SET_ERROR_MSG("Invalid entity type."); + return std::nullopt; + } + if (node_info.ns_.empty() || node_info.name_.empty()) { + RCUTILS_SET_ERROR_MSG("Invalid node_info for entity."); + return std::nullopt; + } + if (type != EntityType::Node && !topic_info.has_value()) { + RCUTILS_SET_ERROR_MSG("Invalid topic_info for entity."); + return std::nullopt; + } + + Entity entity{std::move(type), std::move(node_info), std::move(topic_info)}; + return std::move(entity); +} + +///============================================================================= +std::optional Entity::make(const std::string & keyexpr) +{ + + std::vector parts = split_keyexpr(keyexpr); + // At minimum, a token will contain 5 parts (ADMIN_SPACE, domain_id, entity_str, namespace, node_name). + // Basic validation. + if (parts.size() < 5) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + for (const std::string & p : parts) { + if (p.empty()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + } + + if (parts[0] != ADMIN_SPACE) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid admin space."); + return std::nullopt; + } + + // Get the entity, ie NN, MP, MS, SS, SC. + std::string & entity_str = parts[2]; + const auto entity_it = str_to_entity.find(entity_str); + if (entity_it == str_to_entity.end()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid entity."); + return std::nullopt; + } + + EntityType entity_type = entity_it->second; + std::size_t domain_id = std::stoul(parts[1]); + std::string ns = parts[3] == "_" ? "/" : "/" + std::move(parts[3]); + std::string node_name = std::move(parts[4]); + std::optional topic_info = std::nullopt; + + // Populate topic_info if we have a token for an entity other than a node. + if (entity_type != EntityType::Node) { + if (parts.size() < 8) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token for non-node entity without required parameters."); + return std::nullopt; + } + topic_info = TopicInfo{ + "/" + std::move(parts[5]), + std::move(parts[6]), + std::move(parts[7])}; + } + + return Entity{ + std::move(entity_type), + NodeInfo{std::move(domain_id), std::move(ns), std::move(node_name), ""}, + std::move(topic_info)}; +} + +///============================================================================= +const EntityType & Entity::type() const +{ + return this->type_; +} + +///============================================================================= +const NodeInfo & Entity::node_info() const +{ + return this->node_info_; +} + +///============================================================================= +const std::optional & Entity::topic_info() const +{ + return this->topic_info_; +} + +///============================================================================= +const std::string & Entity::keyexpr() const +{ + return this->keyexpr_; +} + +///============================================================================= +bool PublishToken::put( + z_owned_session_t * session, + const std::string & token) +{ + if (!z_session_check(session)) { + RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); + return false; + } + + // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. + z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); + auto drop_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); + return false; + } + RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); + z_put_options_t options = z_put_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); + if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { + RCUTILS_SET_ERROR_MSG("unable to publish liveliness for node creation"); + return false; + } + + return true; +} + +///============================================================================= +bool PublishToken::del( + z_owned_session_t * session, + const std::string & token) +{ + if (!z_session_check(session)) { + RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); + return false; + } + + // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. + z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); + auto drop_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); + return false; + } + RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); + const z_delete_options_t options = z_delete_options_default(); + if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { + RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); + return false; + } + + return true; +} + +///============================================================================= +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim) +{ + std::vector delim_idx = {}; + // Insert -1 for starting position to make the split easier when using substr. + delim_idx.push_back(-1); + std::size_t idx = 0; + for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { + if (*it == delim) { + delim_idx.push_back(idx); + } + ++idx; + } + std::vector result = {}; + try { + for (std::size_t i = 1; i < delim_idx.size(); ++i) { + const auto & prev_idx = delim_idx.at(i - 1); + const auto & idx = delim_idx.at(i); + result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); + } + } catch (const std::exception & e) { + printf("%s\n", e.what()); + return {}; + } + // Finally add the last substr. + result.push_back(keyexpr.substr(delim_idx.back() + 1)); + return result; +} + +} // namespace liveliness diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp new file mode 100644 index 00000000..1a1d58fb --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -0,0 +1,137 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__LIVELINESS_UTILS_HPP_ +#define DETAIL__LIVELINESS_UTILS_HPP_ + +#include + +#include +#include +#include + + +namespace liveliness +{ + +///============================================================================= +struct NodeInfo +{ + + std::size_t domain_id_; + std::string ns_; + std::string name_; + std::string enclave_; + + NodeInfo( + std::size_t domain_id, + std::string ns, + std::string name, + std::string enclave); +}; + +///============================================================================= +struct TopicInfo +{ + std::string name_; + std::string type_; + std::string qos_; + + TopicInfo( + std::string name, + std::string type, + std::string qos); +}; + +///============================================================================= +/// Retuns the keyexpr for liveliness subscription. +std::string subscription_token(size_t domain_id); + +///============================================================================= +enum class EntityType : uint8_t +{ + Invalid = 0, + Node, + Publisher, + Subscription, + Service, + Client +}; + +///============================================================================= +// An struct to bundle results of parsing a token. +// TODO(Yadunund): Consider using variadic templates to pass args instead of +// relying on optional fields. +class Entity +{ +public: + /// Make an Entity from datatypes. This will return nullopt if the required + /// fields are not present for the EntityType. + // TODO(Yadunund): Find a way to better bundle the type and the associated data. + static std::optional make( + EntityType type, + NodeInfo node_info, + std::optional topic_info = std::nullopt); + + /// Make an Entity from a liveliness keyexpr. + static std::optional make(const std::string & keyexpr); + + /// Get the entity type. + const EntityType & type() const; + + /// Get the node_info. + const NodeInfo & node_info() const; + + /// Get the topic_info. + const std::optional & topic_info() const; + + /// Get the liveliness keyexpr for this entity. + const std::string & keyexpr() const; + +private: + Entity( + EntityType type, + NodeInfo node_info, + std::optional topic_info); + + EntityType type_; + NodeInfo node_info_; + std::optional topic_info_; + std::string keyexpr_; +}; + +///============================================================================= +/// Helper utilities to put/delete tokens until liveliness is supported in the +/// zenoh-c bindings. +class PublishToken +{ +public: + static bool put( + z_owned_session_t * session, + const std::string & token); + + static bool del( + z_owned_session_t * session, + const std::string & token); +}; + +///============================================================================= +/// Split a liveliness token's expression into parts based on a delimiter. +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/'); + +} // namespace liveliness + +#endif // DETAIL__LIVELINESS_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 51f22e3c..1e6da94c 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -18,6 +18,7 @@ #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" +#include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "detail/zenoh_router_check.hpp" @@ -246,7 +247,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) }); // Setup liveliness subscriptions for discovery. - const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id); + const std::string liveliness_str = liveliness::subscription_token(context->actual_domain_id); // Query router/liveliness participants to get graph information before this session was started. RCUTILS_LOG_WARN_NAMED( diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4de21163..23973046 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -25,6 +25,7 @@ #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" +#include "detail/liveliness_utils.hpp" #include "detail/message_type_support.hpp" #include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" @@ -187,16 +188,25 @@ rmw_create_node( // Publish to the graph that a new node is in town // const bool pub_result = PublishToken::put( // &node->context->impl->session, - // GenerateToken::node(context->actual_domain_id, namespace_, name) + // liveliness::GenerateToken::node(context->actual_domain_id, namespace_, name) // ); // if (!pub_result) { // return nullptr; // } // Initialize liveliness token for the node to advertise that a new node is in town. rmw_node_data_t * node_data = static_cast(node->data); + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Node, + liveliness::NodeInfo{context->actual_domain_id, namespace_, name, ""}); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the node."); + return nullptr; + } node_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -241,7 +251,7 @@ rmw_destroy_node(rmw_node_t * node) // Publish to the graph that a node has ridden off into the sunset // const bool del_result = PublishToken::del( // &node->context->impl->session, - // GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) + // liveliness::GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) // ); // if (!del_result) { // return RMW_RET_ERROR; @@ -555,7 +565,7 @@ rmw_create_publisher( // TODO(Yadunund): Publish liveliness for the new publisher. // const bool pub_result = PublishToken::put( // &node->context->impl->session, - // GenerateToken::publisher( + // liveliness::GenerateToken::publisher( // node->context->actual_domain_id, // node->namespace_, // node->name, @@ -566,16 +576,21 @@ rmw_create_publisher( // if (!pub_result) { // return nullptr; // } + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Publisher, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_publisher->topic_name, + publisher_data->type_support->get_name(), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the publisher."); + return nullptr; + } publisher_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr( - GenerateToken::publisher( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_publisher->topic_name, - publisher_data->type_support->get_name(), - "reliable").c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -632,7 +647,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // Publish to the graph that a publisher has ridden off into the sunset // const bool del_result = PublishToken::del( // &node->context->impl->session, - // GenerateToken::publisher( + // liveliness::GenerateToken::publisher( // node->context->actual_domain_id, // node->namespace_, // node->name, @@ -1253,16 +1268,21 @@ rmw_create_subscription( }); // Publish to the graph that a new subscription is in town + const auto liveliness_entity = liveliness::Entity::make( + liveliness::EntityType::Publisher, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_subscription->topic_name, + sub_data->type_support->get_name(), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the subscription."); + return nullptr; + } sub_data->token = zc_liveliness_declare_token( z_loan(context_impl->session), - z_keyexpr( - GenerateToken::subscription( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_subscription->topic_name, - sub_data->type_support->get_name(), - "reliable").c_str()), + z_keyexpr(liveliness_entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( From 7268001ba67089711cd4acd3dfe23b3572431b9e Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 27 Nov 2023 16:34:48 +0530 Subject: [PATCH 04/13] Map topic types to TopicData Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 29 ++++++++++++------------ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 17 ++++---------- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 3710ac77..596299ba 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -82,14 +82,19 @@ void GraphCache::parse_put(const std::string & keyexpr) entity.topic_info().value(), TopicStats{pub_count, sub_count}); - GraphNode::TopicDataSet topic_data_set = {graph_topic_data}; - auto insertion = topic_map.insert(std::make_pair(entity.topic_info()->name_, topic_data_set)); + GraphNode::TopicDataMap topic_data_map = { + {graph_topic_data->info_.type_, graph_topic_data}}; + auto insertion = topic_map.insert(std::make_pair(entity.topic_info()->name_, topic_data_map)); if (!insertion.second) { // A topic with the same name already exists in the node so we append the type. - auto type_insertion = insertion.first->second.insert(graph_topic_data); + auto type_insertion = + insertion.first->second.insert( + std::make_pair( + graph_topic_data->info_.type_, + graph_topic_data)); if (!type_insertion.second) { // We have another instance of a pub/sub over the same topic and type so we increment the counters. - auto & existing_graph_topic = *(type_insertion.first); + auto & existing_graph_topic = type_insertion.first->second; existing_graph_topic->stats_.pub_count_ += pub_count; existing_graph_topic->stats_.pub_count_ += sub_count; } @@ -219,14 +224,10 @@ void GraphCache::parse_del(const std::string & keyexpr) return; } - auto & topic_data_set = topic_it->second; + auto & topic_data_map = topic_it->second; // Search the unordered_set for the TopicData for this topic. - auto topic_data_it = std::find_if( - topic_data_set.begin(), topic_data_set.end(), - [&entity](const auto topic_data_ptr) { - return entity.topic_info()->type_ == topic_data_ptr->info_.type_; - }); - if (topic_data_it == topic_data_set.end()) { + auto topic_data_it = topic_data_map.find(entity.topic_info()->type_); + if (topic_data_it == topic_data_map.end()) { // Something is wrong. RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "TopicData not found for topic %s. Report this.", @@ -235,16 +236,16 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Decrement the relevant counters. Check if both counters are 0 and if so remove from graph_node. - auto & existing_topic_data = *topic_data_it; + auto & existing_topic_data = topic_data_it->second; existing_topic_data->stats_.pub_count_ -= pub_count; existing_topic_data->stats_.sub_count_ -= sub_count; if (existing_topic_data->stats_.pub_count_ == 0 && existing_topic_data->stats_.sub_count_ == 0) { - topic_data_set.erase(topic_data_it); + topic_data_map.erase(topic_data_it); } // If the topic does not have any TopicData entries, erase the topic from the map. - if (topic_data_set.empty()) { + if (topic_data_map.empty()) { topic_map.erase(entity.topic_info()->name_); } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index e4b94b1e..77217599 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -43,6 +43,7 @@ struct TopicStats }; using TopicStatsPtr = std::unique_ptr; using TopicInfo = liveliness::TopicInfo; + ///============================================================================= struct TopicData { @@ -64,18 +65,10 @@ struct GraphNode // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? std::string enclave_; - // Hash topic data using "type" string to only support unique topic_types. - // TODO(Yadunund): Should we also factor the "qos" into the cache? - struct TopicDataHash - { - std::size_t operator()(const TopicDataPtr & data) const - { - return std::hash{}(data->info_.type_); - } - }; - // Map topic name to a set of TopicData to support multiple types per topic. - using TopicDataSet = std::unordered_set; - using TopicMap = std::unordered_map; + // Map topic type to TopicData + using TopicDataMap = std::unordered_map; + // Map topic name to TopicDataMap + using TopicMap = std::unordered_map; TopicMap pubs_ = {}; TopicMap subs_ = {}; }; From 64a023d8f34c518554778be14c131bda1f81de7a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 27 Nov 2023 23:09:28 +0800 Subject: [PATCH 05/13] Store map of types in graph_topics_ Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 106 +++++++++++++---------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 6 +- 2 files changed, 62 insertions(+), 50 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 596299ba..c87d664a 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -102,15 +102,31 @@ void GraphCache::parse_put(const std::string & keyexpr) // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. if (update_cache) { - std::string topic_key = entity.topic_info()->name_ + "?" + entity.topic_info()->type_; - auto cache_insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (cache_insertion.second) { - // First entry for this topic name + type combo so we create a new TopicStats instance. - cache_insertion.first->second = std::make_unique(pub_count, sub_count); + auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); + if (cache_topic_it == graph_topics_.end()) { + // First time this topic name is added to the graph. + auto topic_data_ptr = std::make_shared( + entity.topic_info().value(), + TopicStats{pub_count, sub_count} + ); + graph_topics_[entity.topic_info()->name_] = GraphNode::TopicDataMap{ + {entity.topic_info()->type_, topic_data_ptr} + }; } else { - // Else we update the existing counters. - cache_insertion.first->second->pub_count_ += pub_count; - cache_insertion.first->second->sub_count_ += sub_count; + // If a TopicData entry for the same type exists in the topic map, update pub/sub counts or + // else create an new TopicData. + auto topic_data_insertion = + cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr)); + if (topic_data_insertion.second) { + // A TopicData for the topic_type does not exist. + topic_data_insertion.first->second = std::make_shared( + entity.topic_info().value(), + TopicStats{pub_count, sub_count}); + } else { + // Update the existing counters. + topic_data_insertion.first->second->stats_.pub_count_ += pub_count; + topic_data_insertion.first->second->stats_.sub_count_ += sub_count; + } } } @@ -251,20 +267,25 @@ void GraphCache::parse_del(const std::string & keyexpr) // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. if (update_cache) { - std::string topic_key = entity.topic_info()->name_ + "?" + entity.topic_info()->type_; - auto cache_topic_it = graph_topics_.find(topic_key); + auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); if (cache_topic_it == graph_topics_.end()) { // This should not happen. RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", - topic_key.c_str()); + entity.topic_info()->name_.c_str()); } else { - // Decrement the relevant counters. Check if both counters are 0 and if so remove from cache. - cache_topic_it->second->pub_count_ -= pub_count; - cache_topic_it->second->sub_count_ -= sub_count; - if (cache_topic_it->second->pub_count_ == 0 && cache_topic_it->second->sub_count_ == 0) { - graph_topics_.erase(topic_key); + auto cache_topic_data_it = cache_topic_it->second.find(entity.topic_info()->type_); + if (cache_topic_data_it != cache_topic_it->second.end()) { + // Decrement the relevant counters. Check if both counters are 0 and if so remove from cache. + cache_topic_data_it->second->stats_.pub_count_ -= pub_count; + cache_topic_data_it->second->stats_.sub_count_ -= sub_count; + if (cache_topic_data_it->second->stats_.pub_count_ == 0 && + cache_topic_data_it->second->stats_.sub_count_ == 0) + { + graph_topics_.erase(entity.topic_info()->name_); + } } + } } @@ -496,40 +517,33 @@ rmw_ret_t GraphCache::get_topic_names_and_types( }); // Fill topic names and types. - std::size_t j = 0; - for (const auto & it : graph_topics_) { - // Split based on "?". - // TODO(Yadunund): Be more systematic about this. - // TODO(clalancette): Rather than doing the splitting here, should we store - // it in graph_topics_ already split? - std::vector parts = liveliness::split_keyexpr(it.first, '?'); - if (parts.size() < 2) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid topic_key %s", it.first.c_str()); - return RMW_RET_INVALID_ARGUMENT; - } - - topic_names_and_types->names.data[j] = rcutils_strdup(parts[0].c_str(), *allocator); - if (!topic_names_and_types->names.data[j]) { + std::size_t index = 0; + for (const auto & item : graph_topics_) { + topic_names_and_types->names.data[index] = rcutils_strdup(item.first.c_str(), *allocator); + if (!topic_names_and_types->names.data[index]) { return RMW_RET_BAD_ALLOC; } - - // TODO(clalancette): This won't work if there are multiple types on the same topic - rcutils_ret_t rcutils_ret = rcutils_string_array_init( - &topic_names_and_types->types[j], 1, allocator); - if (RCUTILS_RET_OK != rcutils_ret) { - RMW_SET_ERROR_MSG(rcutils_get_error_string().str); - return RMW_RET_BAD_ALLOC; + { + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &topic_names_and_types->types[index], + item.second.size(), + allocator); + if (RCUTILS_RET_OK != rcutils_ret) { + RMW_SET_ERROR_MSG(rcutils_get_error_string().str); + return RMW_RET_BAD_ALLOC; + } } - - topic_names_and_types->types[j].data[0] = rcutils_strdup( - _demangle_if_ros_type(parts[1]).c_str(), *allocator); - if (!topic_names_and_types->types[j].data[0]) { - return RMW_RET_BAD_ALLOC; + size_t type_index = 0; + for (const auto & type : item.second) { + char * type_name = rcutils_strdup(_demangle_if_ros_type(type.first).c_str(), *allocator); + if (!type_name) { + RMW_SET_ERROR_MSG("failed to allocate memory for type name"); + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->types[index].data[type_index] = type_name; + ++type_index; } - - ++j; + ++index; } cleanup_names_and_types.cancel(); diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 77217599..7ed00705 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -41,7 +41,6 @@ struct TopicStats // Constructor which initializes counters to 0. TopicStats(std::size_t pub_count, std::size_t sub_count); }; -using TopicStatsPtr = std::unique_ptr; using TopicInfo = liveliness::TopicInfo; ///============================================================================= @@ -119,9 +118,8 @@ class GraphCache final // Map namespace to a map of . std::unordered_map> graph_ = {}; - // Optimize topic lookups across the graph by mapping "topic_name?topic_type" keys to their pub/sub counts. - // TODO(Yadunund): Consider storing a set of NodePtrs for each key. - std::unordered_map graph_topics_ = {}; + // Optimize topic lookups across the graph. + GraphNode::TopicMap graph_topics_ = {}; mutable std::mutex graph_mutex_; }; From c4fa36c4e5350b46b28ff272217270bf757ac3d2 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 27 Nov 2023 23:47:33 +0800 Subject: [PATCH 06/13] fix bug in deleting and counting topics Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 48 ++++++++++++++++++++++- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 12 +++++- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 50 ++++++++++++++++++++---- 3 files changed, 99 insertions(+), 11 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index c87d664a..1ab1c780 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -282,7 +282,11 @@ void GraphCache::parse_del(const std::string & keyexpr) if (cache_topic_data_it->second->stats_.pub_count_ == 0 && cache_topic_data_it->second->stats_.sub_count_ == 0) { - graph_topics_.erase(entity.topic_info()->name_); + cache_topic_it->second.erase(cache_topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (cache_topic_it->second.empty()) { + graph_topics_.erase(cache_topic_it); } } @@ -495,7 +499,7 @@ _demangle_if_ros_type(const std::string & dds_type_string) rmw_ret_t GraphCache::get_topic_names_and_types( rcutils_allocator_t * allocator, bool no_demangle, - rmw_names_and_types_t * topic_names_and_types) + rmw_names_and_types_t * topic_names_and_types) const { static_cast(no_demangle); RCUTILS_CHECK_ALLOCATOR_WITH_MSG( @@ -550,3 +554,43 @@ rmw_ret_t GraphCache::get_topic_names_and_types( return RMW_RET_OK; } + +///============================================================================= +rmw_ret_t GraphCache::count_publishers( + const rmw_node_t * node, + const char * topic_name, + size_t * count) const +{ + static_cast(node); + *count = 0; + auto topic_it = graph_topics_.find(topic_name); + if (topic_it == graph_topics_.end()) { + return RMW_RET_OK; + } + for (const auto & it : topic_it->second) { + // Iterate through all the types and increment count. + *count += it.second->stats_.pub_count_; + } + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t GraphCache::count_subscriptions( + const rmw_node_t * node, + const char * topic_name, + size_t * count) const +{ + static_cast(node); + *count = 0; + auto topic_it = graph_topics_.find(topic_name); + if (topic_it == graph_topics_.end()) { + return RMW_RET_OK; + } + for (const auto & it : topic_it->second) { + // Iterate through all the types and increment count. + *count += it.second->stats_.sub_count_; + } + + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 7ed00705..d9010711 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -91,7 +91,17 @@ class GraphCache final rmw_ret_t get_topic_names_and_types( rcutils_allocator_t * allocator, bool no_demangle, - rmw_names_and_types_t * topic_names_and_types); + rmw_names_and_types_t * topic_names_and_types) const; + + rmw_ret_t count_publishers( + const rmw_node_t * node, + const char * topic_name, + size_t * count) const; + + rmw_ret_t count_subscriptions( + const rmw_node_t * node, + const char * topic_name, + size_t * count) const; private: /* diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 23973046..a18972e6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2153,10 +2153,27 @@ rmw_count_publishers( const char * topic_name, size_t * count) { - static_cast(node); - static_cast(topic_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_publishers( + node, topic_name, count); } //============================================================================== @@ -2167,10 +2184,27 @@ rmw_count_subscribers( const char * topic_name, size_t * count) { - static_cast(node); - static_cast(topic_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_subscriptions( + node, topic_name, count); } //============================================================================== From 6bc2f39ff8d484740714178f24f20e08fcf4740b Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 27 Nov 2023 23:51:58 +0800 Subject: [PATCH 07/13] Cleanup liveliness_utils Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 67 ++++++++++--------- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 6 -- 2 files changed, 36 insertions(+), 37 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 316cd1a6..e3ca8cd0 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -15,6 +15,7 @@ #include "liveliness_utils.hpp" #include +#include #include "rcpputils/scope_exit.hpp" #include "rcutils/logging_macros.h" @@ -158,6 +159,41 @@ std::optional Entity::make( return std::move(entity); } +namespace +{ +///============================================================================= +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') +{ + std::vector delim_idx = {}; + // Insert -1 for starting position to make the split easier when using substr. + delim_idx.push_back(-1); + std::size_t idx = 0; + for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { + if (*it == delim) { + delim_idx.push_back(idx); + } + ++idx; + } + std::vector result = {}; + try { + for (std::size_t i = 1; i < delim_idx.size(); ++i) { + const auto & prev_idx = delim_idx.at(i - 1); + const auto & idx = delim_idx.at(i); + result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); + } + } catch (const std::exception & e) { + printf("%s\n", e.what()); + return {}; + } + // Finally add the last substr. + result.push_back(keyexpr.substr(delim_idx.back() + 1)); + return result; +} + +} // namespace + ///============================================================================= std::optional Entity::make(const std::string & keyexpr) { @@ -308,35 +344,4 @@ bool PublishToken::del( return true; } -///============================================================================= -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim) -{ - std::vector delim_idx = {}; - // Insert -1 for starting position to make the split easier when using substr. - delim_idx.push_back(-1); - std::size_t idx = 0; - for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { - delim_idx.push_back(idx); - } - ++idx; - } - std::vector result = {}; - try { - for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const auto & prev_idx = delim_idx.at(i - 1); - const auto & idx = delim_idx.at(i); - result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); - } - } catch (const std::exception & e) { - printf("%s\n", e.what()); - return {}; - } - // Finally add the last substr. - result.push_back(keyexpr.substr(delim_idx.back() + 1)); - return result; -} - } // namespace liveliness diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 1a1d58fb..a1a99d9d 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -126,12 +126,6 @@ class PublishToken const std::string & token); }; -///============================================================================= -/// Split a liveliness token's expression into parts based on a delimiter. -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim = '/'); - } // namespace liveliness #endif // DETAIL__LIVELINESS_UTILS_HPP_ From 7f6fa1f4d69f1c585045f3f699e363e470ffba94 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 27 Nov 2023 15:55:45 +0000 Subject: [PATCH 08/13] Style cleanups. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 27 ++++++++-------- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 31 +++++++++++-------- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 5 ++- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 3 +- 4 files changed, 36 insertions(+), 30 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 1ab1c780..c80a4421 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -93,14 +93,15 @@ void GraphCache::parse_put(const std::string & keyexpr) graph_topic_data->info_.type_, graph_topic_data)); if (!type_insertion.second) { - // We have another instance of a pub/sub over the same topic and type so we increment the counters. + // We have another instance of a pub/sub over the same topic and type so we increment + // the counters. auto & existing_graph_topic = type_insertion.first->second; existing_graph_topic->stats_.pub_count_ += pub_count; existing_graph_topic->stats_.pub_count_ += sub_count; } } - // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. + // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. if (update_cache) { auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); if (cache_topic_it == graph_topics_.end()) { @@ -113,8 +114,8 @@ void GraphCache::parse_put(const std::string & keyexpr) {entity.topic_info()->type_, topic_data_ptr} }; } else { - // If a TopicData entry for the same type exists in the topic map, update pub/sub counts or - // else create an new TopicData. + // If a TopicData entry for the same type exists in the topic map, update pub/sub counts + // or else create an new TopicData. auto topic_data_insertion = cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr)); if (topic_data_insertion.second) { @@ -251,7 +252,7 @@ void GraphCache::parse_del(const std::string & keyexpr) return; } - // Decrement the relevant counters. Check if both counters are 0 and if so remove from graph_node. + // Decrement the relevant counters. If both counters are 0 remove from graph_node. auto & existing_topic_data = topic_data_it->second; existing_topic_data->stats_.pub_count_ -= pub_count; existing_topic_data->stats_.sub_count_ -= sub_count; @@ -265,7 +266,7 @@ void GraphCache::parse_del(const std::string & keyexpr) topic_map.erase(entity.topic_info()->name_); } - // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. + // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. if (update_cache) { auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); if (cache_topic_it == graph_topics_.end()) { @@ -276,7 +277,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } else { auto cache_topic_data_it = cache_topic_it->second.find(entity.topic_info()->type_); if (cache_topic_data_it != cache_topic_it->second.end()) { - // Decrement the relevant counters. Check if both counters are 0 and if so remove from cache. + // Decrement the relevant counters. If both counters are 0 remove from cache. cache_topic_data_it->second->stats_.pub_count_ -= pub_count; cache_topic_data_it->second->stats_.sub_count_ -= sub_count; if (cache_topic_data_it->second->stats_.pub_count_ == 0 && @@ -289,7 +290,6 @@ void GraphCache::parse_del(const std::string & keyexpr) graph_topics_.erase(cache_topic_it); } } - } } @@ -321,17 +321,18 @@ void GraphCache::parse_del(const std::string & keyexpr) if (entity.type() == EntityType::Node) { // Node // The liveliness tokens to remove pub/subs should be received before the one to remove a node - // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in the node below, - // we should update the count in graph_topics_. + // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in + // the node below, we should update the count in graph_topics_. const auto graph_node = node_it->second; if (!graph_node->pubs_.empty() || !graph_node->subs_.empty()) { RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Received liveliness token to remove node /%s from the graph before all pub/subs for this node have been removed. " - "Report this issue.", + "Received liveliness token to remove node /%s from the graph before all pub/subs for this " + "node have been removed. Report this issue.", entity.node_info().name_.c_str() ); - // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in graph_topics_. + // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in + // graph_topics_. } ns_it->second.erase(entity.node_info().name_); RCUTILS_LOG_WARN_NAMED( diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index e3ca8cd0..589335df 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -14,7 +14,12 @@ #include "liveliness_utils.hpp" +#include #include +#include +#include +#include +#include #include #include "rcpputils/scope_exit.hpp" @@ -55,12 +60,12 @@ namespace { /// The admin space used to prefix the liveliness tokens. -static const std::string ADMIN_SPACE = "@ros2_lv"; -static const std::string NODE_STR = "NN"; -static const std::string PUB_STR = "MP"; -static const std::string SUB_STR = "MS"; -static const std::string SRV_STR = "SS"; -static const std::string CLI_STR = "SC"; +static const char ADMIN_SPACE[] = "@ros2_lv"; +static const char NODE_STR[] = "NN"; +static const char PUB_STR[] = "MP"; +static const char SUB_STR[] = "MS"; +static const char SRV_STR[] = "SS"; +static const char CLI_STR[] = "SC"; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -78,12 +83,12 @@ static const std::unordered_map str_to_entity = { {CLI_STR, EntityType::Client} }; -} // namespace +} // namespace ///============================================================================= std::string subscription_token(size_t domain_id) { - std::string token = ADMIN_SPACE + "/" + std::to_string(domain_id) + "/**"; + std::string token = std::string(ADMIN_SPACE) + "/" + std::to_string(domain_id) + "/**"; return token; } @@ -156,7 +161,7 @@ std::optional Entity::make( } Entity entity{std::move(type), std::move(node_info), std::move(topic_info)}; - return std::move(entity); + return entity; } namespace @@ -192,14 +197,14 @@ std::vector split_keyexpr( return result; } -} // namespace +} // namespace ///============================================================================= std::optional Entity::make(const std::string & keyexpr) { - std::vector parts = split_keyexpr(keyexpr); - // At minimum, a token will contain 5 parts (ADMIN_SPACE, domain_id, entity_str, namespace, node_name). + // A token will contain at least 5 parts: + // (ADMIN_SPACE, domain_id, entity_str, namespace, node_name). // Basic validation. if (parts.size() < 5) { RCUTILS_LOG_ERROR_NAMED( @@ -344,4 +349,4 @@ bool PublishToken::del( return true; } -} // namespace liveliness +} // namespace liveliness diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index a1a99d9d..5f07ea4b 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -19,7 +19,7 @@ #include #include -#include +#include namespace liveliness @@ -28,7 +28,6 @@ namespace liveliness ///============================================================================= struct NodeInfo { - std::size_t domain_id_; std::string ns_; std::string name_; @@ -126,6 +125,6 @@ class PublishToken const std::string & token); }; -} // namespace liveliness +} // namespace liveliness #endif // DETAIL__LIVELINESS_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index a18972e6..ed957fdd 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -251,7 +251,8 @@ rmw_destroy_node(rmw_node_t * node) // Publish to the graph that a node has ridden off into the sunset // const bool del_result = PublishToken::del( // &node->context->impl->session, - // liveliness::GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) + // liveliness::GenerateToken::node(node->context->actual_domain_id, node->namespace_, + // node->name) // ); // if (!del_result) { // return RMW_RET_ERROR; From 7c79ff1f17707eda7f9ae1396439c441fefcd7e7 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 27 Nov 2023 16:55:23 +0000 Subject: [PATCH 09/13] Cleanup some of the code in the liveliness. Also fix a couple of bugs where we were mixing up subscriptions and publications, leading to incorrect counts. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 67 +++++++++++------------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 7 +-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 8 ++- 3 files changed, 35 insertions(+), 47 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index c80a4421..b4e82dd9 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -46,7 +46,7 @@ TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) ///============================================================================= TopicData::TopicData( - TopicInfo info, + liveliness::TopicInfo info, TopicStats stats) : info_(std::move(info)), stats_(std::move(stats)) @@ -63,10 +63,9 @@ void GraphCache::parse_put(const std::string & keyexpr) const auto & entity = *valid_entity; // Helper lambda to append pub/subs to the GraphNode. - // We capture by reference to update caches like graph_topics_ if update_cache is true. + // We capture by reference to update graph_topics_ auto add_topic_data = - [&](const Entity & entity, GraphNode & graph_node, - bool update_cache = false) -> void + [this](const Entity & entity, GraphNode & graph_node) -> void { if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { return; @@ -97,37 +96,35 @@ void GraphCache::parse_put(const std::string & keyexpr) // the counters. auto & existing_graph_topic = type_insertion.first->second; existing_graph_topic->stats_.pub_count_ += pub_count; - existing_graph_topic->stats_.pub_count_ += sub_count; + existing_graph_topic->stats_.sub_count_ += sub_count; } } - // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. - if (update_cache) { - auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); - if (cache_topic_it == graph_topics_.end()) { - // First time this topic name is added to the graph. - auto topic_data_ptr = std::make_shared( + // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph + auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); + if (cache_topic_it == graph_topics_.end()) { + // First time this topic name is added to the graph. + auto topic_data_ptr = std::make_shared( + entity.topic_info().value(), + TopicStats{pub_count, sub_count} + ); + graph_topics_[entity.topic_info()->name_] = GraphNode::TopicDataMap{ + {entity.topic_info()->type_, topic_data_ptr} + }; + } else { + // If a TopicData entry for the same type exists in the topic map, update pub/sub counts + // or else create an new TopicData. + auto topic_data_insertion = + cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr)); + if (topic_data_insertion.second) { + // A TopicData for the topic_type does not exist. + topic_data_insertion.first->second = std::make_shared( entity.topic_info().value(), - TopicStats{pub_count, sub_count} - ); - graph_topics_[entity.topic_info()->name_] = GraphNode::TopicDataMap{ - {entity.topic_info()->type_, topic_data_ptr} - }; + TopicStats{pub_count, sub_count}); } else { - // If a TopicData entry for the same type exists in the topic map, update pub/sub counts - // or else create an new TopicData. - auto topic_data_insertion = - cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr)); - if (topic_data_insertion.second) { - // A TopicData for the topic_type does not exist. - topic_data_insertion.first->second = std::make_shared( - entity.topic_info().value(), - TopicStats{pub_count, sub_count}); - } else { - // Update the existing counters. - topic_data_insertion.first->second->stats_.pub_count_ += pub_count; - topic_data_insertion.first->second->stats_.sub_count_ += sub_count; - } + // Update the existing counters. + topic_data_insertion.first->second->stats_.pub_count_ += pub_count; + topic_data_insertion.first->second->stats_.sub_count_ += sub_count; } } @@ -141,7 +138,7 @@ void GraphCache::parse_put(const std::string & keyexpr) graph_node.name_.c_str()); }; - // Helper lambdas to convert an Entity into a GraphNode. + // Helper lambda to convert an Entity into a GraphNode. auto make_graph_node = [&](const Entity & entity) -> std::shared_ptr { @@ -155,7 +152,7 @@ void GraphCache::parse_put(const std::string & keyexpr) return graph_node; } // Add pub/sub entries. - add_topic_data(entity, *graph_node, true); + add_topic_data(entity, *graph_node); return graph_node; }; @@ -205,7 +202,7 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Update the graph based on the entity. - add_topic_data(entity, *(node_it->second), true); + add_topic_data(entity, *(node_it->second)); } ///============================================================================= @@ -558,11 +555,9 @@ rmw_ret_t GraphCache::get_topic_names_and_types( ///============================================================================= rmw_ret_t GraphCache::count_publishers( - const rmw_node_t * node, const char * topic_name, size_t * count) const { - static_cast(node); *count = 0; auto topic_it = graph_topics_.find(topic_name); if (topic_it == graph_topics_.end()) { @@ -578,11 +573,9 @@ rmw_ret_t GraphCache::count_publishers( ///============================================================================= rmw_ret_t GraphCache::count_subscriptions( - const rmw_node_t * node, const char * topic_name, size_t * count) const { - static_cast(node); *count = 0; auto topic_it = graph_topics_.find(topic_name); if (topic_it == graph_topics_.end()) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index d9010711..bec68400 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -41,16 +41,15 @@ struct TopicStats // Constructor which initializes counters to 0. TopicStats(std::size_t pub_count, std::size_t sub_count); }; -using TopicInfo = liveliness::TopicInfo; ///============================================================================= struct TopicData { - TopicInfo info_; + liveliness::TopicInfo info_; TopicStats stats_; TopicData( - TopicInfo info, + liveliness::TopicInfo info, TopicStats stats); }; using TopicDataPtr = std::shared_ptr; @@ -94,12 +93,10 @@ class GraphCache final rmw_names_and_types_t * topic_names_and_types) const; rmw_ret_t count_publishers( - const rmw_node_t * node, const char * topic_name, size_t * count) const; rmw_ret_t count_subscriptions( - const rmw_node_t * node, const char * topic_name, size_t * count) const; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index ed957fdd..4fa0ebbb 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1270,7 +1270,7 @@ rmw_create_subscription( // Publish to the graph that a new subscription is in town const auto liveliness_entity = liveliness::Entity::make( - liveliness::EntityType::Publisher, + liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name, sub_data->type_support->get_name(), "reliable"} @@ -2173,8 +2173,7 @@ rmw_count_publishers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.count_publishers( - node, topic_name, count); + return node->context->impl->graph_cache.count_publishers(topic_name, count); } //============================================================================== @@ -2204,8 +2203,7 @@ rmw_count_subscribers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.count_subscriptions( - node, topic_name, count); + return node->context->impl->graph_cache.count_subscriptions(topic_name, count); } //============================================================================== From 178cc38062f7631d31672a31e4012f9c82ff19bf Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 27 Nov 2023 18:02:24 +0000 Subject: [PATCH 10/13] Remove Entity::node_info. Because it returned a reference, it is difficult to tell the lifetime of it. It would also be hard to add in locking in the future. Since we are reaching through it in most cases, just return some data directly. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 34 +++++++++---------- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 20 ++++++++--- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 7 ++-- 3 files changed, 37 insertions(+), 24 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index b4e82dd9..40293ad8 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -143,9 +143,9 @@ void GraphCache::parse_put(const std::string & keyexpr) [&](const Entity & entity) -> std::shared_ptr { auto graph_node = std::make_shared(); - graph_node->ns_ = entity.node_info().ns_; - graph_node->name_ = entity.node_info().name_; - graph_node->enclave_ = entity.node_info().enclave_; + graph_node->ns_ = entity.node_namespace(); + graph_node->name_ = entity.node_name(); + graph_node->enclave_ = entity.node_enclave(); if (!entity.topic_info().has_value()) { // Token was for a node. @@ -161,26 +161,26 @@ void GraphCache::parse_put(const std::string & keyexpr) std::lock_guard lock(graph_mutex_); // If the namespace did not exist, create it and add the node to the graph and return. - auto ns_it = graph_.find(entity.node_info().ns_); + auto ns_it = graph_.find(entity.node_namespace()); if (ns_it == graph_.end()) { std::unordered_map node_map = { - {entity.node_info().name_, make_graph_node(entity)}}; - graph_.insert(std::make_pair(entity.node_info().ns_, std::move(node_map))); + {entity.node_name(), make_graph_node(entity)}}; + graph_.insert(std::make_pair(entity.node_namespace(), std::move(node_map))); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", - entity.node_info().name_.c_str(), - entity.node_info().ns_.c_str()); + entity.node_name().c_str(), + entity.node_namespace().c_str()); return; } // Add the node to the namespace if it did not exist and return. - auto node_it = ns_it->second.find(entity.node_info().name_); + auto node_it = ns_it->second.find(entity.node_name()); if (node_it == ns_it->second.end()) { - ns_it->second.insert(std::make_pair(entity.node_info().name_, make_graph_node(entity))); + ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.", - entity.node_info().name_.c_str(), - entity.node_info().ns_.c_str()); + entity.node_name().c_str(), + entity.node_namespace().c_str()); return; } @@ -304,13 +304,13 @@ void GraphCache::parse_del(const std::string & keyexpr) std::lock_guard lock(graph_mutex_); // If namespace does not exist, ignore the request. - auto ns_it = graph_.find(entity.node_info().ns_); + auto ns_it = graph_.find(entity.node_namespace()); if (ns_it == graph_.end()) { return; } // If the node does not exist, ignore the request. - auto node_it = ns_it->second.find(entity.node_info().name_); + auto node_it = ns_it->second.find(entity.node_name()); if (node_it == ns_it->second.end()) { return; } @@ -326,16 +326,16 @@ void GraphCache::parse_del(const std::string & keyexpr) "rmw_zenoh_cpp", "Received liveliness token to remove node /%s from the graph before all pub/subs for this " "node have been removed. Report this issue.", - entity.node_info().name_.c_str() + entity.node_name().c_str() ); // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in // graph_topics_. } - ns_it->second.erase(entity.node_info().name_); + ns_it->second.erase(entity.node_name()); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Removed node /%s from the graph.", - entity.node_info().name_.c_str() + entity.node_name().c_str() ); return; } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 589335df..0e3a108c 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -119,7 +119,7 @@ Entity::Entity( * /////// */ std::stringstream token_ss; - const auto & ns = node_info_.ns_; + const std::string & ns = node_info_.ns_; token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << entity_to_str.at(type_) << ns; // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". // Hence we add an "_" to denote an empty namespace such that splitting the key @@ -230,7 +230,8 @@ std::optional Entity::make(const std::string & keyexpr) // Get the entity, ie NN, MP, MS, SS, SC. std::string & entity_str = parts[2]; - const auto entity_it = str_to_entity.find(entity_str); + std::unordered_map::const_iterator entity_it = + str_to_entity.find(entity_str); if (entity_it == str_to_entity.end()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -270,10 +271,19 @@ const EntityType & Entity::type() const return this->type_; } -///============================================================================= -const NodeInfo & Entity::node_info() const +std::string Entity::node_namespace() const +{ + return this->node_info_.ns_; +} + +std::string Entity::node_name() const +{ + return this->node_info_.name_; +} + +std::string Entity::node_enclave() const { - return this->node_info_; + return this->node_info_.enclave_; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 5f07ea4b..529ef092 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -89,8 +89,11 @@ class Entity /// Get the entity type. const EntityType & type() const; - /// Get the node_info. - const NodeInfo & node_info() const; + std::string node_namespace() const; + + std::string node_name() const; + + std::string node_enclave() const; /// Get the topic_info. const std::optional & topic_info() const; From b26a26835e1c41cccf67970b362348f2895643be Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 27 Nov 2023 18:17:55 +0000 Subject: [PATCH 11/13] Remove some uses of auto. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 92 ++++++++++--------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 5 +- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 6 +- 3 files changed, 55 insertions(+), 48 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 40293ad8..cc1c436a 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -55,12 +55,12 @@ TopicData::TopicData( ///============================================================================= void GraphCache::parse_put(const std::string & keyexpr) { - auto valid_entity = liveliness::Entity::make(keyexpr); + std::optional valid_entity = liveliness::Entity::make(keyexpr); if (!valid_entity.has_value()) { // Error message has already been logged. return; } - const auto & entity = *valid_entity; + const liveliness::Entity & entity = *valid_entity; // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update graph_topics_ @@ -71,7 +71,7 @@ void GraphCache::parse_put(const std::string & keyexpr) return; } - auto & topic_map = entity.type() == + GraphNode::TopicMap & topic_map = entity.type() == EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; const std::string entity_desc = entity.type() == EntityType::Publisher ? "publisher" : "subscription"; @@ -83,10 +83,11 @@ void GraphCache::parse_put(const std::string & keyexpr) GraphNode::TopicDataMap topic_data_map = { {graph_topic_data->info_.type_, graph_topic_data}}; - auto insertion = topic_map.insert(std::make_pair(entity.topic_info()->name_, topic_data_map)); + std::pair insertion = + topic_map.insert(std::make_pair(entity.topic_info()->name_, topic_data_map)); if (!insertion.second) { // A topic with the same name already exists in the node so we append the type. - auto type_insertion = + std::pair type_insertion = insertion.first->second.insert( std::make_pair( graph_topic_data->info_.type_, @@ -94,14 +95,14 @@ void GraphCache::parse_put(const std::string & keyexpr) if (!type_insertion.second) { // We have another instance of a pub/sub over the same topic and type so we increment // the counters. - auto & existing_graph_topic = type_insertion.first->second; + TopicDataPtr & existing_graph_topic = type_insertion.first->second; existing_graph_topic->stats_.pub_count_ += pub_count; existing_graph_topic->stats_.sub_count_ += sub_count; } } // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph - auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); + GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(entity.topic_info()->name_); if (cache_topic_it == graph_topics_.end()) { // First time this topic name is added to the graph. auto topic_data_ptr = std::make_shared( @@ -114,7 +115,7 @@ void GraphCache::parse_put(const std::string & keyexpr) } else { // If a TopicData entry for the same type exists in the topic map, update pub/sub counts // or else create an new TopicData. - auto topic_data_insertion = + std::pair topic_data_insertion = cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr)); if (topic_data_insertion.second) { // A TopicData for the topic_type does not exist. @@ -161,9 +162,9 @@ void GraphCache::parse_put(const std::string & keyexpr) std::lock_guard lock(graph_mutex_); // If the namespace did not exist, create it and add the node to the graph and return. - auto ns_it = graph_.find(entity.node_namespace()); + NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); if (ns_it == graph_.end()) { - std::unordered_map node_map = { + NodeMap node_map = { {entity.node_name(), make_graph_node(entity)}}; graph_.insert(std::make_pair(entity.node_namespace(), std::move(node_map))); RCUTILS_LOG_WARN_NAMED( @@ -174,7 +175,7 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Add the node to the namespace if it did not exist and return. - auto node_it = ns_it->second.find(entity.node_name()); + NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); if (node_it == ns_it->second.end()) { ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); RCUTILS_LOG_WARN_NAMED( @@ -208,12 +209,12 @@ void GraphCache::parse_put(const std::string & keyexpr) ///============================================================================= void GraphCache::parse_del(const std::string & keyexpr) { - auto valid_entity = liveliness::Entity::make(keyexpr); + std::optional valid_entity = liveliness::Entity::make(keyexpr); if (!valid_entity.has_value()) { // Error message has already been logged. return; } - const auto & entity = *valid_entity; + const liveliness::Entity & entity = *valid_entity; // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update caches like graph_topics_ if update_cache is true. @@ -225,22 +226,23 @@ void GraphCache::parse_del(const std::string & keyexpr) return; } - auto & topic_map = entity.type() == + GraphNode::TopicMap & topic_map = entity.type() == EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; const std::string entity_desc = entity.type() == EntityType::Publisher ? "publisher" : "subscription"; const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; const std::size_t sub_count = !pub_count; - auto topic_it = topic_map.find(entity.topic_info()->name_); + GraphNode::TopicMap::iterator topic_it = topic_map.find(entity.topic_info()->name_); if (topic_it == topic_map.end()) { // Pub/sub not found. return; } - auto & topic_data_map = topic_it->second; + GraphNode::TopicDataMap & topic_data_map = topic_it->second; // Search the unordered_set for the TopicData for this topic. - auto topic_data_it = topic_data_map.find(entity.topic_info()->type_); + GraphNode::TopicDataMap::iterator topic_data_it = + topic_data_map.find(entity.topic_info()->type_); if (topic_data_it == topic_data_map.end()) { // Something is wrong. RCUTILS_LOG_ERROR_NAMED( @@ -250,7 +252,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Decrement the relevant counters. If both counters are 0 remove from graph_node. - auto & existing_topic_data = topic_data_it->second; + TopicDataPtr & existing_topic_data = topic_data_it->second; existing_topic_data->stats_.pub_count_ -= pub_count; existing_topic_data->stats_.sub_count_ -= sub_count; if (existing_topic_data->stats_.pub_count_ == 0 && @@ -265,14 +267,16 @@ void GraphCache::parse_del(const std::string & keyexpr) // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. if (update_cache) { - auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); + GraphNode::TopicMap::iterator cache_topic_it = + graph_topics_.find(entity.topic_info()->name_); if (cache_topic_it == graph_topics_.end()) { // This should not happen. RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", entity.topic_info()->name_.c_str()); } else { - auto cache_topic_data_it = cache_topic_it->second.find(entity.topic_info()->type_); + GraphNode::TopicDataMap::iterator cache_topic_data_it = + cache_topic_it->second.find(entity.topic_info()->type_); if (cache_topic_data_it != cache_topic_it->second.end()) { // Decrement the relevant counters. If both counters are 0 remove from cache. cache_topic_data_it->second->stats_.pub_count_ -= pub_count; @@ -304,13 +308,13 @@ void GraphCache::parse_del(const std::string & keyexpr) std::lock_guard lock(graph_mutex_); // If namespace does not exist, ignore the request. - auto ns_it = graph_.find(entity.node_namespace()); + NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); if (ns_it == graph_.end()) { return; } // If the node does not exist, ignore the request. - auto node_it = ns_it->second.find(entity.node_name()); + NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); if (node_it == ns_it->second.end()) { return; } @@ -320,7 +324,7 @@ void GraphCache::parse_del(const std::string & keyexpr) // The liveliness tokens to remove pub/subs should be received before the one to remove a node // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in // the node below, we should update the count in graph_topics_. - const auto graph_node = node_it->second; + const GraphNodePtr graph_node = node_it->second; if (!graph_node->pubs_.empty() || !graph_node->subs_.empty()) { RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", @@ -376,8 +380,8 @@ rmw_ret_t GraphCache::get_node_names( allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); size_t nodes_number = 0; - for (auto it = graph_.begin(); it != graph_.end(); ++it) { - nodes_number += it->second.size(); + for (const std::pair & it : graph_) { + nodes_number += it.second.size(); } rcutils_ret_t rcutils_ret = @@ -433,10 +437,12 @@ rmw_ret_t GraphCache::get_node_names( // Fill node names, namespaces and enclaves. std::size_t j = 0; - for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { + for (NamespaceMap::const_iterator ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { const std::string & ns = ns_it->first; - for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { - const auto node = node_it->second; + for (NodeMap::const_iterator node_it = ns_it->second.begin(); node_it != ns_it->second.end(); + ++node_it) + { + const GraphNodePtr node = node_it->second; node_names->data[j] = rcutils_strdup(node->name_.c_str(), *allocator); if (!node_names->data[j]) { return RMW_RET_BAD_ALLOC; @@ -520,7 +526,7 @@ rmw_ret_t GraphCache::get_topic_names_and_types( // Fill topic names and types. std::size_t index = 0; - for (const auto & item : graph_topics_) { + for (const std::pair & item : graph_topics_) { topic_names_and_types->names.data[index] = rcutils_strdup(item.first.c_str(), *allocator); if (!topic_names_and_types->names.data[index]) { return RMW_RET_BAD_ALLOC; @@ -536,7 +542,7 @@ rmw_ret_t GraphCache::get_topic_names_and_types( } } size_t type_index = 0; - for (const auto & type : item.second) { + for (const std::pair & type : item.second) { char * type_name = rcutils_strdup(_demangle_if_ros_type(type.first).c_str(), *allocator); if (!type_name) { RMW_SET_ERROR_MSG("failed to allocate memory for type name"); @@ -559,13 +565,12 @@ rmw_ret_t GraphCache::count_publishers( size_t * count) const { *count = 0; - auto topic_it = graph_topics_.find(topic_name); - if (topic_it == graph_topics_.end()) { - return RMW_RET_OK; - } - for (const auto & it : topic_it->second) { - // Iterate through all the types and increment count. - *count += it.second->stats_.pub_count_; + + if (graph_topics_.count(topic_name) != 0) { + for (const std::pair & it : graph_topics_.at(topic_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.pub_count_; + } } return RMW_RET_OK; @@ -577,13 +582,12 @@ rmw_ret_t GraphCache::count_subscriptions( size_t * count) const { *count = 0; - auto topic_it = graph_topics_.find(topic_name); - if (topic_it == graph_topics_.end()) { - return RMW_RET_OK; - } - for (const auto & it : topic_it->second) { - // Iterate through all the types and increment count. - *count += it.second->stats_.sub_count_; + + if (graph_topics_.count(topic_name) != 0) { + for (const std::pair & it : graph_topics_.at(topic_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.sub_count_; + } } return RMW_RET_OK; diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index bec68400..de78dd96 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -122,8 +122,11 @@ class GraphCache final namespace_2: node_n: */ + + using NodeMap = std::unordered_map; + using NamespaceMap = std::unordered_map; // Map namespace to a map of . - std::unordered_map> graph_ = {}; + NamespaceMap graph_ = {}; // Optimize topic lookups across the graph. GraphNode::TopicMap graph_topics_ = {}; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 0e3a108c..1f575fb6 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -175,7 +175,7 @@ std::vector split_keyexpr( // Insert -1 for starting position to make the split easier when using substr. delim_idx.push_back(-1); std::size_t idx = 0; - for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { + for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { if (*it == delim) { delim_idx.push_back(idx); } @@ -184,8 +184,8 @@ std::vector split_keyexpr( std::vector result = {}; try { for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const auto & prev_idx = delim_idx.at(i - 1); - const auto & idx = delim_idx.at(i); + const size_t prev_idx = delim_idx[i - 1]; + const size_t idx = delim_idx[i]; result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); } } catch (const std::exception & e) { From 4488b976fa205a1ec2c8f98b538b49d96934d185 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 28 Nov 2023 11:49:44 +0800 Subject: [PATCH 12/13] Return copies from Entity Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 62 ++++++++++++------- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 6 +- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 6 +- 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index cc1c436a..4fb501c2 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -71,6 +71,16 @@ void GraphCache::parse_put(const std::string & keyexpr) return; } + if (!entity.topic_info().has_value()) { + // This should not happen as add_topic_data() is called after validating the existence + // of topic_info. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "add_topic_data() called without valid TopicInfo. Report this."); + return; + } + + const liveliness::TopicInfo topic_info = entity.topic_info().value(); GraphNode::TopicMap & topic_map = entity.type() == EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; const std::string entity_desc = entity.type() == @@ -78,13 +88,13 @@ void GraphCache::parse_put(const std::string & keyexpr) const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; const std::size_t sub_count = !pub_count; TopicDataPtr graph_topic_data = std::make_shared( - entity.topic_info().value(), + topic_info, TopicStats{pub_count, sub_count}); GraphNode::TopicDataMap topic_data_map = { {graph_topic_data->info_.type_, graph_topic_data}}; std::pair insertion = - topic_map.insert(std::make_pair(entity.topic_info()->name_, topic_data_map)); + topic_map.insert(std::make_pair(topic_info.name_, topic_data_map)); if (!insertion.second) { // A topic with the same name already exists in the node so we append the type. std::pair type_insertion = @@ -102,25 +112,25 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph - GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(entity.topic_info()->name_); + GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(topic_info.name_); if (cache_topic_it == graph_topics_.end()) { // First time this topic name is added to the graph. auto topic_data_ptr = std::make_shared( - entity.topic_info().value(), + topic_info, TopicStats{pub_count, sub_count} ); - graph_topics_[entity.topic_info()->name_] = GraphNode::TopicDataMap{ - {entity.topic_info()->type_, topic_data_ptr} + graph_topics_[topic_info.name_] = GraphNode::TopicDataMap{ + {topic_info.type_, topic_data_ptr} }; } else { // If a TopicData entry for the same type exists in the topic map, update pub/sub counts // or else create an new TopicData. std::pair topic_data_insertion = - cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr)); + cache_topic_it->second.insert(std::make_pair(topic_info.type_, nullptr)); if (topic_data_insertion.second) { // A TopicData for the topic_type does not exist. topic_data_insertion.first->second = std::make_shared( - entity.topic_info().value(), + topic_info, TopicStats{pub_count, sub_count}); } else { // Update the existing counters. @@ -133,9 +143,9 @@ void GraphCache::parse_put(const std::string & keyexpr) "rmw_zenoh_cpp", "Added %s on topic %s with type %s and qos %s to node /%s.", entity_desc.c_str(), - entity.topic_info()->name_.c_str(), - entity.topic_info()->type_.c_str(), - entity.topic_info()->qos_.c_str(), + topic_info.name_.c_str(), + topic_info.type_.c_str(), + topic_info.qos_.c_str(), graph_node.name_.c_str()); }; @@ -226,6 +236,16 @@ void GraphCache::parse_del(const std::string & keyexpr) return; } + if (!entity.topic_info().has_value()) { + // This should not happen as add_topic_data() is called after validating the existence + // of topic_info. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "remove_topic_data() called without valid TopicInfo. Report this."); + return; + } + + const liveliness::TopicInfo topic_info = entity.topic_info().value(); GraphNode::TopicMap & topic_map = entity.type() == EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; const std::string entity_desc = entity.type() == @@ -233,7 +253,7 @@ void GraphCache::parse_del(const std::string & keyexpr) const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; const std::size_t sub_count = !pub_count; - GraphNode::TopicMap::iterator topic_it = topic_map.find(entity.topic_info()->name_); + GraphNode::TopicMap::iterator topic_it = topic_map.find(topic_info.name_); if (topic_it == topic_map.end()) { // Pub/sub not found. return; @@ -242,12 +262,12 @@ void GraphCache::parse_del(const std::string & keyexpr) GraphNode::TopicDataMap & topic_data_map = topic_it->second; // Search the unordered_set for the TopicData for this topic. GraphNode::TopicDataMap::iterator topic_data_it = - topic_data_map.find(entity.topic_info()->type_); + topic_data_map.find(topic_info.type_); if (topic_data_it == topic_data_map.end()) { // Something is wrong. RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "TopicData not found for topic %s. Report this.", - entity.topic_info()->name_.c_str()); + topic_info.name_.c_str()); return; } @@ -262,21 +282,21 @@ void GraphCache::parse_del(const std::string & keyexpr) } // If the topic does not have any TopicData entries, erase the topic from the map. if (topic_data_map.empty()) { - topic_map.erase(entity.topic_info()->name_); + topic_map.erase(topic_info.name_); } // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. if (update_cache) { GraphNode::TopicMap::iterator cache_topic_it = - graph_topics_.find(entity.topic_info()->name_); + graph_topics_.find(topic_info.name_); if (cache_topic_it == graph_topics_.end()) { // This should not happen. RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", - entity.topic_info()->name_.c_str()); + topic_info.name_.c_str()); } else { GraphNode::TopicDataMap::iterator cache_topic_data_it = - cache_topic_it->second.find(entity.topic_info()->type_); + cache_topic_it->second.find(topic_info.type_); if (cache_topic_data_it != cache_topic_it->second.end()) { // Decrement the relevant counters. If both counters are 0 remove from cache. cache_topic_data_it->second->stats_.pub_count_ -= pub_count; @@ -298,9 +318,9 @@ void GraphCache::parse_del(const std::string & keyexpr) "rmw_zenoh_cpp", "Removed %s on topic %s with type %s and qos %s to node /%s.", entity_desc.c_str(), - entity.topic_info()->name_.c_str(), - entity.topic_info()->type_.c_str(), - entity.topic_info()->qos_.c_str(), + topic_info.name_.c_str(), + topic_info.type_.c_str(), + topic_info.qos_.c_str(), graph_node.name_.c_str()); }; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 1f575fb6..204f515d 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -266,7 +266,7 @@ std::optional Entity::make(const std::string & keyexpr) } ///============================================================================= -const EntityType & Entity::type() const +EntityType Entity::type() const { return this->type_; } @@ -287,13 +287,13 @@ std::string Entity::node_enclave() const } ///============================================================================= -const std::optional & Entity::topic_info() const +std::optional Entity::topic_info() const { return this->topic_info_; } ///============================================================================= -const std::string & Entity::keyexpr() const +std::string Entity::keyexpr() const { return this->keyexpr_; } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 529ef092..90f76e89 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -87,7 +87,7 @@ class Entity static std::optional make(const std::string & keyexpr); /// Get the entity type. - const EntityType & type() const; + EntityType type() const; std::string node_namespace() const; @@ -96,10 +96,10 @@ class Entity std::string node_enclave() const; /// Get the topic_info. - const std::optional & topic_info() const; + std::optional topic_info() const; /// Get the liveliness keyexpr for this entity. - const std::string & keyexpr() const; + std::string keyexpr() const; private: Entity( From 357f6588fab1f2f9192e7be418d9f1c3b4956da5 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 1 Dec 2023 15:20:23 +0900 Subject: [PATCH 13/13] Address feedback Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 4fb501c2..b47c14e5 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -115,13 +115,7 @@ void GraphCache::parse_put(const std::string & keyexpr) GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(topic_info.name_); if (cache_topic_it == graph_topics_.end()) { // First time this topic name is added to the graph. - auto topic_data_ptr = std::make_shared( - topic_info, - TopicStats{pub_count, sub_count} - ); - graph_topics_[topic_info.name_] = GraphNode::TopicDataMap{ - {topic_info.type_, topic_data_ptr} - }; + graph_topics_[topic_info.name_] = std::move(topic_data_map); } else { // If a TopicData entry for the same type exists in the topic map, update pub/sub counts // or else create an new TopicData. @@ -176,7 +170,7 @@ void GraphCache::parse_put(const std::string & keyexpr) if (ns_it == graph_.end()) { NodeMap node_map = { {entity.node_name(), make_graph_node(entity)}}; - graph_.insert(std::make_pair(entity.node_namespace(), std::move(node_map))); + graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", entity.node_name().c_str(),