diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index b90b40ae..a9e99463 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -156,6 +157,7 @@ void GraphCache::parse_put(const std::string & keyexpr) [&](const Entity & entity) -> std::shared_ptr { auto graph_node = std::make_shared(); + graph_node->id_ = entity.id(); graph_node->ns_ = entity.node_namespace(); graph_node->name_ = entity.node_name(); graph_node->enclave_ = entity.node_enclave(); @@ -187,29 +189,53 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Add the node to the namespace if it did not exist and return. - NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); - if (node_it == ns_it->second.end()) { - ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.", - entity.node_name().c_str(), - entity.node_namespace().c_str()); + // Case 1: First time a node with this name is added to the namespace. + // Case 2: There are one or more nodes with the same name but the entity could + // represent a node with the same name but a unique id which would make it a + // new addition to the graph. + std::pair range = ns_it->second.equal_range( + entity.node_name()); + NodeMap::iterator node_it = std::find_if( + range.first, range.second, + [&entity](const std::pair & node_it) + { + return entity.id() == node_it.second->id_; + }); + if (node_it == range.second) { + // Either the first time a node with this name is added or with an existing + // name but unique id. + NodeMap::iterator insertion_it = + ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); + if (insertion_it != ns_it->second.end()) { + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Added a new node /%s with id %s to an existing namespace %s in the graph.", + entity.node_name().c_str(), + entity.id().c_str(), + entity.node_namespace().c_str()); + } else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to add a new node /%s with id %s an existing namespace %s in the graph. Report this bug.", + entity.node_name().c_str(), + entity.id().c_str(), + entity.node_namespace().c_str()); + } return; } + // Otherwise, the entity represents a node that already exists in the graph. + // Update topic info if required below. // Handles additions to an existing node in the graph. if (entity.type() == EntityType::Node) { - // The NN entity is implicitly handled above where we add the node. - // If control reaches here, then we received a duplicate entry for the same node. - // This could happen when we get() all the liveliness tokens when the node spins up and - // receive a MP token before an NN one. + // Creating a new node above would have also updated the graph with any topic info. return; } if (!entity.topic_info().has_value()) { // Likely an error with parsing the token. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info.", + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info. Report this bug.", keyexpr.c_str()); return; } @@ -228,11 +254,42 @@ void GraphCache::parse_del(const std::string & keyexpr) } const liveliness::Entity & entity = *valid_entity; + // Helper lambda to update graph_topics_. + auto update_graph_topics = + [&](const liveliness::TopicInfo topic_info, std::size_t pub_count, + std::size_t sub_count) -> void + { + GraphNode::TopicMap::iterator cache_topic_it = + graph_topics_.find(topic_info.name_); + if (cache_topic_it == graph_topics_.end()) { + // This should not happen. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", + topic_info.name_.c_str()); + } else { + GraphNode::TopicDataMap::iterator cache_topic_data_it = + cache_topic_it->second.find(topic_info.type_); + if (cache_topic_data_it != cache_topic_it->second.end()) { + // Decrement the relevant counters. If both counters are 0 remove from cache. + cache_topic_data_it->second->stats_.pub_count_ -= pub_count; + cache_topic_data_it->second->stats_.sub_count_ -= sub_count; + if (cache_topic_data_it->second->stats_.pub_count_ == 0 && + cache_topic_data_it->second->stats_.sub_count_ == 0) + { + cache_topic_it->second.erase(cache_topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (cache_topic_it->second.empty()) { + graph_topics_.erase(cache_topic_it); + } + } + } + }; + // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update caches like graph_topics_ if update_cache is true. auto remove_topic_data = - [&](const Entity & entity, GraphNode & graph_node, - bool update_cache = false) -> void + [&](const Entity & entity, GraphNode & graph_node) -> void { if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { return; @@ -288,33 +345,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. - if (update_cache) { - GraphNode::TopicMap::iterator cache_topic_it = - graph_topics_.find(topic_info.name_); - if (cache_topic_it == graph_topics_.end()) { - // This should not happen. - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", - topic_info.name_.c_str()); - } else { - GraphNode::TopicDataMap::iterator cache_topic_data_it = - cache_topic_it->second.find(topic_info.type_); - if (cache_topic_data_it != cache_topic_it->second.end()) { - // Decrement the relevant counters. If both counters are 0 remove from cache. - cache_topic_data_it->second->stats_.pub_count_ -= pub_count; - cache_topic_data_it->second->stats_.sub_count_ -= sub_count; - if (cache_topic_data_it->second->stats_.pub_count_ == 0 && - cache_topic_data_it->second->stats_.sub_count_ == 0) - { - cache_topic_it->second.erase(cache_topic_data_it); - } - // If the topic does not have any TopicData entries, erase the topic from the map. - if (cache_topic_it->second.empty()) { - graph_topics_.erase(cache_topic_it); - } - } - } - } + update_graph_topics(topic_info, pub_count, sub_count); RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -336,8 +367,22 @@ void GraphCache::parse_del(const std::string & keyexpr) } // If the node does not exist, ignore the request. - NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); - if (node_it == ns_it->second.end()) { + std::pair range = ns_it->second.equal_range( + entity.node_name()); + NodeMap::iterator node_it = std::find_if( + range.first, range.second, + [&entity](const std::pair & node_it) + { + // An operator== overload is defined above. + return entity.id() == node_it.second->id_; + }); + if (node_it == range.second) { + // Node does not exist. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token to remove unknown node /%s from the graph. Ignoring...", + entity.node_name().c_str() + ); return; } @@ -351,13 +396,25 @@ void GraphCache::parse_del(const std::string & keyexpr) RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Received liveliness token to remove node /%s from the graph before all pub/subs for this " - "node have been removed. Report this issue.", + "node have been removed. Removing all pub/subs first...", entity.node_name().c_str() ); - // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in - // graph_topics_. + auto remove_topics = + [&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void { + std::size_t pub_count = entity_type == EntityType::Publisher ? 1 : 0; + std::size_t sub_count = !pub_count; + for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) { + for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end(); + ++type_it) + { + update_graph_topics(type_it->second->info_, pub_count, sub_count); + } + } + }; + remove_topics(graph_node->pubs_, EntityType::Publisher); + remove_topics(graph_node->subs_, EntityType::Subscription); } - ns_it->second.erase(entity.node_name()); + ns_it->second.erase(node_it); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Removed node /%s from the graph.", @@ -368,14 +425,14 @@ void GraphCache::parse_del(const std::string & keyexpr) if (!entity.topic_info().has_value()) { // Likely an error with parsing the token. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.", + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData. Report this bug.", keyexpr.c_str()); return; } // Update the graph based on the entity. - remove_topic_data(entity, *(node_it->second), true); + remove_topic_data(entity, *(node_it->second)); } ///============================================================================= @@ -667,6 +724,8 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node( } // Check if node exists. + // Since NodeMap is a multimap, this will return the first node with the same + // name that is found. NodeMap::const_iterator node_it = ns_it->second.find(node_name); if (node_it == ns_it->second.end()) { return RMW_RET_OK; diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index c6143d8b..ca65c0f9 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -59,6 +59,7 @@ using TopicDataPtr = std::shared_ptr; // TODO(Yadunund): Expand to services and clients. struct GraphNode { + std::string id_; std::string ns_; std::string name_; // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? @@ -139,7 +140,8 @@ class GraphCache final node_n: */ - using NodeMap = std::unordered_map; + // We rely on a multimap to store nodes with duplicate names. + using NodeMap = std::multimap; using NamespaceMap = std::unordered_map; // Map namespace to a map of . NamespaceMap graph_ = {}; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 204f515d..b1b1b0f1 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -83,6 +83,18 @@ static const std::unordered_map str_to_entity = { {CLI_STR, EntityType::Client} }; +std::string zid_to_str(z_id_t id) +{ + std::stringstream ss; + ss << std::hex; + size_t i = 0; + for (; i < (sizeof(id.id) - 1); i++) { + ss << static_cast(id.id[i]) << "."; + } + ss << static_cast(id.id[i]); + return ss.str(); +} + } // namespace ///============================================================================= @@ -94,10 +106,12 @@ std::string subscription_token(size_t domain_id) ///============================================================================= Entity::Entity( + std::string id, EntityType type, NodeInfo node_info, std::optional topic_info) -: type_(std::move(type)), +: id_(std::move(id)), + type_(std::move(type)), node_info_(std::move(node_info)), topic_info_(std::move(topic_info)) { @@ -106,21 +120,26 @@ Entity::Entity( * * The liveliness token keyexprs are in the form: * - * //// + * ///// * * Where: * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. + * - A unique ID to identify this entity. Currently the id is the zenoh session's id with elements concatenated into a string using '.' as separator. * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. * - The ROS node name for this entity. * * For entities with topic infomation, the liveliness token keyexpr have additional fields: * - * /////// + * //////// + * - The ROS topic name for this entity. + * - The type for the topic. + * - The qos for the topic. */ std::stringstream token_ss; const std::string & ns = node_info_.ns_; - token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << entity_to_str.at(type_) << ns; + token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << id_ << "/" << entity_to_str.at( + type_) << ns; // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". // Hence we add an "_" to denote an empty namespace such that splitting the key // will always result in 5 parts. @@ -143,6 +162,7 @@ Entity::Entity( ///============================================================================= std::optional Entity::make( + z_id_t id, EntityType type, NodeInfo node_info, std::optional topic_info) @@ -160,7 +180,7 @@ std::optional Entity::make( return std::nullopt; } - Entity entity{std::move(type), std::move(node_info), std::move(topic_info)}; + Entity entity{zid_to_str(id), std::move(type), std::move(node_info), std::move(topic_info)}; return entity; } @@ -206,7 +226,7 @@ std::optional Entity::make(const std::string & keyexpr) // A token will contain at least 5 parts: // (ADMIN_SPACE, domain_id, entity_str, namespace, node_name). // Basic validation. - if (parts.size() < 5) { + if (parts.size() < 6) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received invalid liveliness token"); @@ -229,7 +249,7 @@ std::optional Entity::make(const std::string & keyexpr) } // Get the entity, ie NN, MP, MS, SS, SC. - std::string & entity_str = parts[2]; + std::string & entity_str = parts[3]; std::unordered_map::const_iterator entity_it = str_to_entity.find(entity_str); if (entity_it == str_to_entity.end()) { @@ -241,30 +261,38 @@ std::optional Entity::make(const std::string & keyexpr) EntityType entity_type = entity_it->second; std::size_t domain_id = std::stoul(parts[1]); - std::string ns = parts[3] == "_" ? "/" : "/" + std::move(parts[3]); - std::string node_name = std::move(parts[4]); + std::string & id = parts[2]; + std::string ns = parts[4] == "_" ? "/" : "/" + std::move(parts[4]); + std::string node_name = std::move(parts[5]); std::optional topic_info = std::nullopt; // Populate topic_info if we have a token for an entity other than a node. if (entity_type != EntityType::Node) { - if (parts.size() < 8) { + if (parts.size() < 9) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received liveliness token for non-node entity without required parameters."); return std::nullopt; } topic_info = TopicInfo{ - "/" + std::move(parts[5]), - std::move(parts[6]), - std::move(parts[7])}; + "/" + std::move(parts[6]), + std::move(parts[7]), + std::move(parts[8])}; } return Entity{ + std::move(id), std::move(entity_type), NodeInfo{std::move(domain_id), std::move(ns), std::move(node_name), ""}, std::move(topic_info)}; } +///============================================================================= +std::string Entity::id() const +{ + return this->id_; +} + ///============================================================================= EntityType Entity::type() const { diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 90f76e89..13e09cb4 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -79,6 +79,7 @@ class Entity /// fields are not present for the EntityType. // TODO(Yadunund): Find a way to better bundle the type and the associated data. static std::optional make( + z_id_t id, EntityType type, NodeInfo node_info, std::optional topic_info = std::nullopt); @@ -86,6 +87,8 @@ class Entity /// Make an Entity from a liveliness keyexpr. static std::optional make(const std::string & keyexpr); + std::string id() const; + /// Get the entity type. EntityType type() const; @@ -103,10 +106,12 @@ class Entity private: Entity( + std::string id, EntityType type, NodeInfo node_info, std::optional topic_info); + std::string id_; EntityType type_; NodeInfo node_info_; std::optional topic_info_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4cc1ad71..90f85860 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -196,6 +196,7 @@ rmw_create_node( // Initialize liveliness token for the node to advertise that a new node is in town. rmw_node_data_t * node_data = static_cast(node->data); const auto liveliness_entity = liveliness::Entity::make( + z_info_zid(z_loan(context->impl->session)), liveliness::EntityType::Node, liveliness::NodeInfo{context->actual_domain_id, namespace_, name, ""}); if (!liveliness_entity.has_value()) { @@ -605,6 +606,7 @@ rmw_create_publisher( // return nullptr; // } const auto liveliness_entity = liveliness::Entity::make( + z_info_zid(z_loan(node->context->impl->session)), liveliness::EntityType::Publisher, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_publisher->topic_name, @@ -1297,6 +1299,7 @@ rmw_create_subscription( // Publish to the graph that a new subscription is in town const auto liveliness_entity = liveliness::Entity::make( + z_info_zid(z_loan(node->context->impl->session)), liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name,