Skip to content

Commit

Permalink
Implement liveliness tokens for subscriptions and update graph
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Nov 16, 2023
1 parent 7e2f55a commit a0f47b1
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 7 deletions.
96 changes: 96 additions & 0 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ std::string GenerateToken::publisher(
return token;
}

///=============================================================================
std::string GenerateToken::subscription(
size_t domain_id,
const std::string & node_namespace,
const std::string & node_name,
const std::string & topic,
const std::string & type,
const std::string & qos)
{
std::string token = generate_base_token("MS", domain_id, node_namespace, node_name);
token += topic + "/" + type + "/" + qos;
printf("GenerateToken::Subscription: %s\n", token.c_str());
return token;
}

///=============================================================================
bool PublishToken::put(
z_owned_session_t * session,
Expand Down Expand Up @@ -301,6 +316,8 @@ void GraphCache::parse_put(const std::string & keyexpr)
if (!insertion.second && !node->pubs.empty()) {
// Node already exists so just append the publisher.
insertion.first->second->pubs.push_back(node->pubs[0]);
} else {
return;
}
}
// Bookkeeping
Expand All @@ -320,6 +337,39 @@ void GraphCache::parse_put(const std::string & keyexpr)
return;
} else if (entity == "MS") {
// Subscription
auto ns_it = graph_.find(node->ns);
if (ns_it == graph_.end()) {
// Potential edge case where a liveliness update for a node creation was missed.
// So we add the node here.
std::string ns = node->ns;
std::unordered_map<std::string, GraphNodePtr> map = {
{node->name, node}
};
graph_.insert(std::make_pair(std::move(ns), std::move(map)));
} else {
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second && !node->subs.empty()) {
// Node already exists so just append the publisher.
insertion.first->second->subs.push_back(node->subs[0]);
} else {
return;
}
}
// Bookkeeping
// TODO(Yadunund): Be more systematic about generating the key.
std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type;
auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr));
if (!insertion.second) {
// Such a topic already exists so we just increment its count.
++insertion.first->second->sub_count_;
} else {
insertion.first->second = std::make_unique<TopicStats>(0, 1);
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.",
node->subs.at(0).topic.c_str(),
node->name.c_str());
return;
} else if (entity == "SS") {
// Service
} else if (entity == "SC") {
Expand Down Expand Up @@ -404,6 +454,52 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
} else if (entity == "MS") {
// Subscription
if (node->subs.empty()) {
// This should never happen but we make sure _parse_token() has no error.
return;
}
auto ns_it = graph_.find(node->ns);
if (ns_it != graph_.end()) {
auto node_it = ns_it->second.find(node->name);
if (node_it != ns_it->second.end()) {
const auto found_node = node_it->second;
// Here we iterate throught the list of subscriptions and remove the one
// with matching name, type and qos.
// TODO(Yadunund): This can be more optimal than O(n) with some caching.
auto erase_it = found_node->subs.begin();
for (; erase_it != found_node->subs.end(); ++erase_it) {
const auto & sub = *erase_it;
if (sub.topic == node->subs.at(0).topic &&
sub.type == node->subs.at(0).type &&
sub.qos == node->subs.at(0).qos)
{
break;
}
}
if (erase_it != found_node->subs.end()) {
found_node->subs.erase(erase_it);
// Bookkeeping
// TODO(Yadunund): Be more systematic about generating the key.
std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type;
auto topic_it = graph_topics_.find(topic_key);
if (topic_it != graph_topics_.end()) {
if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) {
// The last subscription was removed so we can delete this entry.
graph_topics_.erase(topic_key);
} else {
// Else we just decrement the count.
--topic_it->second->sub_count_;
}
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed subscription %s from node /%s in the graph.",
node->subs.at(0).topic.c_str(),
node->name.c_str()
);
}
}
}
} else if (entity == "SS") {
// Service
} else if (entity == "SC") {
Expand Down
8 changes: 8 additions & 0 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ class GenerateToken
const std::string & topic,
const std::string & type,
const std::string & qos);

static std::string subscription(
size_t domain_id,
const std::string & node_namespace,
const std::string & node_name,
const std::string & topic,
const std::string & type,
const std::string & qos);
};

///=============================================================================
Expand Down
38 changes: 31 additions & 7 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ rmw_create_publisher(
auto free_token = rcpputils::make_scope_exit(
[publisher_data]() {
if (publisher_data != nullptr) {
z_drop(z_move(publisher_data->token));
zc_liveliness_undeclare_token(z_move(publisher_data->token));
}
});
if (!zc_liveliness_token_check(&publisher_data->token)) {
Expand Down Expand Up @@ -608,6 +608,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
Expand Down Expand Up @@ -1251,8 +1252,32 @@ rmw_create_subscription(
});

// Publish to the graph that a new subscription is in town
// TODO(Yadunund): Publish liveliness for the new subscription.
sub_data->token = zc_liveliness_declare_token(
z_loan(context_impl->session),
z_keyexpr(
GenerateToken::subscription(
node->context->actual_domain_id,
node->namespace_,
node->name,
rmw_subscription->topic_name,
sub_data->type_support->get_name(),
"reliable").c_str()),
NULL
);
auto free_token = rcpputils::make_scope_exit(
[sub_data]() {
if (sub_data != nullptr) {
zc_liveliness_undeclare_token(z_move(sub_data->token));
}
});
if (!zc_liveliness_token_check(&sub_data->token)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the subscription.");
return nullptr;
}

free_token.cancel();
undeclare_z_sub.cancel();
free_topic_name.cancel();
destruct_msg_type_support.cancel();
Expand Down Expand Up @@ -1284,16 +1309,14 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)

rmw_ret_t ret = RMW_RET_OK;

// Publish to the graph that a subscription has ridden off into the sunset
// TODO(Yadunund): Publish liveliness for the deleted subscription.

rcutils_allocator_t * allocator = &node->context->options.allocator;

allocator->deallocate(const_cast<char *>(subscription->topic_name), allocator->state);

auto sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
if (sub_data != nullptr) {

// Publish to the graph that a subscription has ridden off into the sunset
zc_liveliness_undeclare_token(z_move(sub_data->token));

RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, );
allocator->deallocate(sub_data->type_support, allocator->state);

Expand All @@ -1305,6 +1328,7 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, );
allocator->deallocate(sub_data, allocator->state);
}
allocator->deallocate(const_cast<char *>(subscription->topic_name), allocator->state);
allocator->deallocate(subscription, allocator->state);

return ret;
Expand Down

0 comments on commit a0f47b1

Please sign in to comment.