diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index af51eb8e..7ccdeccb 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -83,6 +83,21 @@ std::string GenerateToken::publisher( return token; } +///============================================================================= +std::string GenerateToken::subscription( + size_t domain_id, + const std::string & node_namespace, + const std::string & node_name, + const std::string & topic, + const std::string & type, + const std::string & qos) +{ + std::string token = generate_base_token("MS", domain_id, node_namespace, node_name); + token += topic + "/" + type + "/" + qos; + printf("GenerateToken::Subscription: %s\n", token.c_str()); + return token; +} + ///============================================================================= bool PublishToken::put( z_owned_session_t * session, @@ -301,6 +316,8 @@ void GraphCache::parse_put(const std::string & keyexpr) if (!insertion.second && !node->pubs.empty()) { // Node already exists so just append the publisher. insertion.first->second->pubs.push_back(node->pubs[0]); + } else { + return; } } // Bookkeeping @@ -320,6 +337,39 @@ void GraphCache::parse_put(const std::string & keyexpr) return; } else if (entity == "MS") { // Subscription + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // Potential edge case where a liveliness update for a node creation was missed. + // So we add the node here. + std::string ns = node->ns; + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second && !node->subs.empty()) { + // Node already exists so just append the publisher. + insertion.first->second->subs.push_back(node->subs[0]); + } else { + return; + } + } + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; + auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); + if (!insertion.second) { + // Such a topic already exists so we just increment its count. + ++insertion.first->second->sub_count_; + } else { + insertion.first->second = std::make_unique(0, 1); + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.", + node->subs.at(0).topic.c_str(), + node->name.c_str()); + return; } else if (entity == "SS") { // Service } else if (entity == "SC") { @@ -404,6 +454,52 @@ void GraphCache::parse_del(const std::string & keyexpr) } } else if (entity == "MS") { // Subscription + if (node->subs.empty()) { + // This should never happen but we make sure _parse_token() has no error. + return; + } + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + auto node_it = ns_it->second.find(node->name); + if (node_it != ns_it->second.end()) { + const auto found_node = node_it->second; + // Here we iterate throught the list of subscriptions and remove the one + // with matching name, type and qos. + // TODO(Yadunund): This can be more optimal than O(n) with some caching. + auto erase_it = found_node->subs.begin(); + for (; erase_it != found_node->subs.end(); ++erase_it) { + const auto & sub = *erase_it; + if (sub.topic == node->subs.at(0).topic && + sub.type == node->subs.at(0).type && + sub.qos == node->subs.at(0).qos) + { + break; + } + } + if (erase_it != found_node->subs.end()) { + found_node->subs.erase(erase_it); + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; + auto topic_it = graph_topics_.find(topic_key); + if (topic_it != graph_topics_.end()) { + if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) { + // The last subscription was removed so we can delete this entry. + graph_topics_.erase(topic_key); + } else { + // Else we just decrement the count. + --topic_it->second->sub_count_; + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Removed subscription %s from node /%s in the graph.", + node->subs.at(0).topic.c_str(), + node->name.c_str() + ); + } + } + } } else if (entity == "SS") { // Service } else if (entity == "SC") { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index c6ac63a5..a67458b1 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -50,6 +50,14 @@ class GenerateToken const std::string & topic, const std::string & type, const std::string & qos); + + static std::string subscription( + size_t domain_id, + const std::string & node_namespace, + const std::string & node_name, + const std::string & topic, + const std::string & type, + const std::string & qos); }; ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 2c8ca5f1..95c59d23 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -580,7 +580,7 @@ rmw_create_publisher( auto free_token = rcpputils::make_scope_exit( [publisher_data]() { if (publisher_data != nullptr) { - z_drop(z_move(publisher_data->token)); + zc_liveliness_undeclare_token(z_move(publisher_data->token)); } }); if (!zc_liveliness_token_check(&publisher_data->token)) { @@ -608,6 +608,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) { RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1251,8 +1252,32 @@ rmw_create_subscription( }); // Publish to the graph that a new subscription is in town - // TODO(Yadunund): Publish liveliness for the new subscription. + sub_data->token = zc_liveliness_declare_token( + z_loan(context_impl->session), + z_keyexpr( + GenerateToken::subscription( + node->context->actual_domain_id, + node->namespace_, + node->name, + rmw_subscription->topic_name, + sub_data->type_support->get_name(), + "reliable").c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [sub_data]() { + if (sub_data != nullptr) { + zc_liveliness_undeclare_token(z_move(sub_data->token)); + } + }); + if (!zc_liveliness_token_check(&sub_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the subscription."); + return nullptr; + } + free_token.cancel(); undeclare_z_sub.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -1284,16 +1309,14 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) rmw_ret_t ret = RMW_RET_OK; - // Publish to the graph that a subscription has ridden off into the sunset - // TODO(Yadunund): Publish liveliness for the deleted subscription. - rcutils_allocator_t * allocator = &node->context->options.allocator; - allocator->deallocate(const_cast(subscription->topic_name), allocator->state); - auto sub_data = static_cast(subscription->data); if (sub_data != nullptr) { + // Publish to the graph that a subscription has ridden off into the sunset + zc_liveliness_undeclare_token(z_move(sub_data->token)); + RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); @@ -1305,6 +1328,7 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); allocator->deallocate(sub_data, allocator->state); } + allocator->deallocate(const_cast(subscription->topic_name), allocator->state); allocator->deallocate(subscription, allocator->state); return ret;