Skip to content

Commit

Permalink
Merge branch 'rolling' into francocipollone/service_support_via_query…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
Yadunund committed Jan 3, 2024
2 parents 336cd24 + 0663919 commit 37ddf24
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 65 deletions.
161 changes: 110 additions & 51 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <algorithm>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
Expand Down Expand Up @@ -156,6 +157,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
[&](const Entity & entity) -> std::shared_ptr<GraphNode>
{
auto graph_node = std::make_shared<GraphNode>();
graph_node->id_ = entity.id();
graph_node->ns_ = entity.node_namespace();
graph_node->name_ = entity.node_name();
graph_node->enclave_ = entity.node_enclave();
Expand Down Expand Up @@ -187,29 +189,53 @@ void GraphCache::parse_put(const std::string & keyexpr)
}

// Add the node to the namespace if it did not exist and return.
NodeMap::iterator node_it = ns_it->second.find(entity.node_name());
if (node_it == ns_it->second.end()) {
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity)));
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.node_namespace().c_str());
// Case 1: First time a node with this name is added to the namespace.
// Case 2: There are one or more nodes with the same name but the entity could
// represent a node with the same name but a unique id which would make it a
// new addition to the graph.
std::pair<NodeMap::iterator, NodeMap::iterator> range = ns_it->second.equal_range(
entity.node_name());
NodeMap::iterator node_it = std::find_if(
range.first, range.second,
[&entity](const std::pair<std::string, GraphNodePtr> & node_it)
{
return entity.id() == node_it.second->id_;
});
if (node_it == range.second) {
// Either the first time a node with this name is added or with an existing
// name but unique id.
NodeMap::iterator insertion_it =
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity)));
if (insertion_it != ns_it->second.end()) {
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Added a new node /%s with id %s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to add a new node /%s with id %s an existing namespace %s in the graph. Report this bug.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
}
return;
}
// Otherwise, the entity represents a node that already exists in the graph.
// Update topic info if required below.

// Handles additions to an existing node in the graph.
if (entity.type() == EntityType::Node) {
// The NN entity is implicitly handled above where we add the node.
// If control reaches here, then we received a duplicate entry for the same node.
// This could happen when we get() all the liveliness tokens when the node spins up and
// receive a MP token before an NN one.
// Creating a new node above would have also updated the graph with any topic info.
return;
}

if (!entity.topic_info().has_value()) {
// Likely an error with parsing the token.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info.",
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info. Report this bug.",
keyexpr.c_str());
return;
}
Expand All @@ -228,11 +254,42 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
const liveliness::Entity & entity = *valid_entity;

// Helper lambda to update graph_topics_.
auto update_graph_topics =
[&](const liveliness::TopicInfo topic_info, std::size_t pub_count,
std::size_t sub_count) -> void
{
GraphNode::TopicMap::iterator cache_topic_it =
graph_topics_.find(topic_info.name_);
if (cache_topic_it == graph_topics_.end()) {
// This should not happen.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.",
topic_info.name_.c_str());
} else {
GraphNode::TopicDataMap::iterator cache_topic_data_it =
cache_topic_it->second.find(topic_info.type_);
if (cache_topic_data_it != cache_topic_it->second.end()) {
// Decrement the relevant counters. If both counters are 0 remove from cache.
cache_topic_data_it->second->stats_.pub_count_ -= pub_count;
cache_topic_data_it->second->stats_.sub_count_ -= sub_count;
if (cache_topic_data_it->second->stats_.pub_count_ == 0 &&
cache_topic_data_it->second->stats_.sub_count_ == 0)
{
cache_topic_it->second.erase(cache_topic_data_it);
}
// If the topic does not have any TopicData entries, erase the topic from the map.
if (cache_topic_it->second.empty()) {
graph_topics_.erase(cache_topic_it);
}
}
}
};

// Helper lambda to append pub/subs to the GraphNode.
// We capture by reference to update caches like graph_topics_ if update_cache is true.
auto remove_topic_data =
[&](const Entity & entity, GraphNode & graph_node,
bool update_cache = false) -> void
[&](const Entity & entity, GraphNode & graph_node) -> void
{
if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) {
return;
Expand Down Expand Up @@ -288,33 +345,7 @@ void GraphCache::parse_del(const std::string & keyexpr)
}

// Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph.
if (update_cache) {
GraphNode::TopicMap::iterator cache_topic_it =
graph_topics_.find(topic_info.name_);
if (cache_topic_it == graph_topics_.end()) {
// This should not happen.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.",
topic_info.name_.c_str());
} else {
GraphNode::TopicDataMap::iterator cache_topic_data_it =
cache_topic_it->second.find(topic_info.type_);
if (cache_topic_data_it != cache_topic_it->second.end()) {
// Decrement the relevant counters. If both counters are 0 remove from cache.
cache_topic_data_it->second->stats_.pub_count_ -= pub_count;
cache_topic_data_it->second->stats_.sub_count_ -= sub_count;
if (cache_topic_data_it->second->stats_.pub_count_ == 0 &&
cache_topic_data_it->second->stats_.sub_count_ == 0)
{
cache_topic_it->second.erase(cache_topic_data_it);
}
// If the topic does not have any TopicData entries, erase the topic from the map.
if (cache_topic_it->second.empty()) {
graph_topics_.erase(cache_topic_it);
}
}
}
}
update_graph_topics(topic_info, pub_count, sub_count);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -336,8 +367,22 @@ void GraphCache::parse_del(const std::string & keyexpr)
}

// If the node does not exist, ignore the request.
NodeMap::iterator node_it = ns_it->second.find(entity.node_name());
if (node_it == ns_it->second.end()) {
std::pair<NodeMap::iterator, NodeMap::iterator> range = ns_it->second.equal_range(
entity.node_name());
NodeMap::iterator node_it = std::find_if(
range.first, range.second,
[&entity](const std::pair<std::string, GraphNodePtr> & node_it)
{
// An operator== overload is defined above.
return entity.id() == node_it.second->id_;
});
if (node_it == range.second) {
// Node does not exist.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token to remove unknown node /%s from the graph. Ignoring...",
entity.node_name().c_str()
);
return;
}

Expand All @@ -351,13 +396,25 @@ void GraphCache::parse_del(const std::string & keyexpr)
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token to remove node /%s from the graph before all pub/subs for this "
"node have been removed. Report this issue.",
"node have been removed. Removing all pub/subs first...",
entity.node_name().c_str()
);
// TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in
// graph_topics_.
auto remove_topics =
[&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void {
std::size_t pub_count = entity_type == EntityType::Publisher ? 1 : 0;
std::size_t sub_count = !pub_count;
for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) {
for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end();
++type_it)
{
update_graph_topics(type_it->second->info_, pub_count, sub_count);
}
}
};
remove_topics(graph_node->pubs_, EntityType::Publisher);
remove_topics(graph_node->subs_, EntityType::Subscription);
}
ns_it->second.erase(entity.node_name());
ns_it->second.erase(node_it);
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed node /%s from the graph.",
Expand All @@ -368,14 +425,14 @@ void GraphCache::parse_del(const std::string & keyexpr)

if (!entity.topic_info().has_value()) {
// Likely an error with parsing the token.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.",
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData. Report this bug.",
keyexpr.c_str());
return;
}

// Update the graph based on the entity.
remove_topic_data(entity, *(node_it->second), true);
remove_topic_data(entity, *(node_it->second));
}

///=============================================================================
Expand Down Expand Up @@ -667,6 +724,8 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node(
}

// Check if node exists.
// Since NodeMap is a multimap, this will return the first node with the same
// name that is found.
NodeMap::const_iterator node_it = ns_it->second.find(node_name);
if (node_it == ns_it->second.end()) {
return RMW_RET_OK;
Expand Down
4 changes: 3 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ using TopicDataPtr = std::shared_ptr<TopicData>;
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{
std::string id_;
std::string ns_;
std::string name_;
// TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node?
Expand Down Expand Up @@ -139,7 +140,8 @@ class GraphCache final
node_n:
*/

using NodeMap = std::unordered_map<std::string, GraphNodePtr>;
// We rely on a multimap to store nodes with duplicate names.
using NodeMap = std::multimap<std::string, GraphNodePtr>;
using NamespaceMap = std::unordered_map<std::string, NodeMap>;
// Map namespace to a map of <node_name, GraphNodePtr>.
NamespaceMap graph_ = {};
Expand Down
Loading

0 comments on commit 37ddf24

Please sign in to comment.