From c6bc79bc11d630fb7a3e2b9f95f1994e53509b2c Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 15:30:28 +0800 Subject: [PATCH] Switch to liveliness tokens Signed-off-by: Yadunund --- .../DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 23 --- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 15 +- rmw_zenoh_cpp/src/rmw_init.cpp | 34 ++-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 146 +++++++++++++----- 4 files changed, 140 insertions(+), 78 deletions(-) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index b35834f7..506822d9 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -259,27 +259,4 @@ }, }, - /// Plugins configurations - /// Plugins are only loaded if present in the configuration. When starting - /// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace. - plugins: { - - /// Configure the storage manager plugin - storage_manager: { - /// Configure the storages supported by the volumes - storages: { - ros2_lv: { - /// Storages always need to know what set of keys they must work with. These sets are defined by a key expression. - key_expr: "@ros2_lv/**", - /// Storages also need to know which volume will be used to actually store their key-value pairs. - /// The "memory" volume is always available, and doesn't require any per-storage options, so requesting "memory" by string is always sufficient. - volume: "memory", - /// A complete storage advertises itself as containing all the known keys matching the configured key expression. - /// If not configured, complete defaults to false. - complete: "true", - }, - }, - }, - }, - } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 421932ac..d25fe777 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -58,11 +58,10 @@ struct rmw_context_impl_s ///============================================================================== struct rmw_node_data_t { - // TODO(yadunund): Add a GraphCache object. - - // Map topic name to topic types. - std::unordered_set> publishers; - std::unordered_set> subscriptions; + // TODO(Yadunund): Do we need a token at the node level? Right now I have one + // for cases where a node may spin up but does not have any publishers or subscriptions. + // Liveliness token for the node. + zc_owned_liveliness_token_t token; }; ///============================================================================== @@ -71,6 +70,9 @@ struct rmw_publisher_data_t // An owned publisher. z_owned_publisher_t pub; + // Liveliness token for the publisher. + zc_owned_liveliness_token_t token; + // Type support fields const void * type_support_impl; const char * typesupport_identifier; @@ -113,6 +115,9 @@ struct rmw_subscription_data_t { z_owned_subscriber_t sub; + // Liveliness token for the subscription. + zc_owned_liveliness_token_t token; + const void * type_support_impl; const char * typesupport_identifier; MessageTypeSupport * type_support; diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 35d8fc1f..9bb5499d 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -241,18 +241,22 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // Setup liveliness subscriptions for discovery. const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id); - // Query the router to get graph information before this session was started. - // TODO(Yadunund): This will not be needed once the zenoh-c liveliness API is available. + // Query the router/liveliness participants to get graph information before this session was started. RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Sending Query '%s' to fetch discovery data from router...", + "Sending Query '%s' to fetch discovery data...", liveliness_str.c_str() ); z_owned_reply_channel_t channel = zc_reply_fifo_new(16); - z_get_options_t opts = z_get_options_default(); - z_get( - z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), - &opts); // here, the send is moved and will be dropped by zenoh when adequate + zc_liveliness_get( + z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), + z_move(channel.send), NULL); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. + // z_get_options_t opts = z_get_options_default(); + // z_get( + // z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), + // &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { if (z_reply_is_ok(&reply)) { @@ -277,14 +281,24 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) liveliness_str.c_str() ); - auto sub_options = z_subscriber_options_default(); - sub_options.reliability = Z_RELIABILITY_RELIABLE; + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. + // auto sub_options = z_subscriber_options_default(); + // sub_options.reliability = Z_RELIABILITY_RELIABLE; + // context->impl->graph_subscriber = z_declare_subscriber( + // z_loan(context->impl->session), + // z_keyexpr(liveliness_str.c_str()), + // z_move(callback), + // &sub_options); + auto sub_options = zc_liveliness_subscriber_options_null(); z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl); - context->impl->graph_subscriber = z_declare_subscriber( + context->impl->graph_subscriber = zc_liveliness_declare_subscriber( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(callback), &sub_options); + // TODO(Yadunund): Uncomment once linker issue is resolved. + // z_drop(z_move(sub_options)); auto undeclare_z_sub = rcpputils::make_scope_exit( [context]() { z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index af7ca85e..ea045fd6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -20,6 +20,8 @@ #include #include +#include + #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" @@ -168,6 +170,7 @@ rmw_create_node( // zenohd is not running. // Put metadata into node->data. node->data = allocator->zero_allocate(1, sizeof(rmw_node_data_t), allocator->state); + rmw_node_data_t * node_data = static_cast(node->data); RMW_CHECK_FOR_NULL_WITH_MSG( node->data, "unable to allocate memory for node data", @@ -180,19 +183,43 @@ rmw_create_node( node->implementation_identifier = rmw_zenoh_identifier; node->context = context; + + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a new node is in town - const bool pub_result = PublishToken::put( - &node->context->impl->session, - GenerateToken::node(context->actual_domain_id, namespace_, name) + // const bool pub_result = PublishToken::put( + // &node->context->impl->session, + // GenerateToken::node(context->actual_domain_id, namespace_, name) + // ); + // if (!pub_result) { + // return nullptr; + // } + // Initialize liveliness token for the node to advertise that a new node is in town. + node_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()), + NULL ); - if (!pub_result) { - return nullptr; - } + auto free_token = rcpputils::make_scope_exit( + [node]() { + if (node->data != nullptr) { + rmw_node_data_t * node_data = static_cast(node->data); + z_drop(z_move(node_data->token)); + } + }); + // TODO(Yadunund): Uncomment this after resolving build error. + // if (!z_check(node_data->token)) { + // RCUTILS_LOG_ERROR_NAMED( + // "rmw_zenoh_cpp", + // "Unable to create liveliness token for the node."); + // return nullptr; + // } free_node_data.cancel(); free_namespace.cancel(); free_name.cancel(); free_node.cancel(); + free_token.cancel(); return node; } @@ -204,20 +231,27 @@ rmw_destroy_node(rmw_node_t * node) RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a node has ridden off into the sunset - const bool del_result = PublishToken::del( - &node->context->impl->session, - GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) - ); - if (!del_result) { - return RMW_RET_ERROR; - } + // const bool del_result = PublishToken::del( + // &node->context->impl->session, + // GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) + // ); + // if (!del_result) { + // return RMW_RET_ERROR; + // } + + // Undeclare liveliness token for the node to advertise that the node has ridden off into the sunset. + rmw_node_data_t * node_data = static_cast(node->data); + z_drop(z_move(node_data->token)); rcutils_allocator_t * allocator = &node->context->options.allocator; @@ -516,21 +550,48 @@ rmw_create_publisher( z_undeclare_publisher(z_move(publisher_data->pub)); }); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a new publisher is in town // TODO(Yadunund): Publish liveliness for the new publisher. - const bool pub_result = PublishToken::put( - &node->context->impl->session, - GenerateToken::publisher( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_publisher->topic_name, - publisher_data->type_support->get_name(), - "reliable") + // const bool pub_result = PublishToken::put( + // &node->context->impl->session, + // GenerateToken::publisher( + // node->context->actual_domain_id, + // node->namespace_, + // node->name, + // rmw_publisher->topic_name, + // publisher_data->type_support->get_name(), + // "reliable") + // ); + // if (!pub_result) { + // return nullptr; + // } + publisher_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr( + GenerateToken::publisher( + node->context->actual_domain_id, + node->namespace_, + node->name, + rmw_publisher->topic_name, + publisher_data->type_support->get_name(), + "reliable").c_str()), + NULL ); - if (!pub_result) { - return nullptr; - } + auto free_token = rcpputils::make_scope_exit( + [publisher_data]() { + if (publisher_data != nullptr) { + z_drop(z_move(publisher_data->token)); + } + }); + // TODO(Yadunund): Uncomment this after resolving build error. + // if (!z_check(publisher_data->token)) { + // RCUTILS_LOG_ERROR_NAMED( + // "rmw_zenoh_cpp", + // "Unable to create liveliness token for the publisher."); + // return nullptr; + // } publisher_data->graph_cache_handle = node->context->impl->graph_cache.add_publisher( rmw_publisher->topic_name, node->name, node->namespace_, @@ -540,6 +601,7 @@ rmw_create_publisher( node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); }); + free_token.cancel(); remove_from_graph_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); @@ -575,22 +637,26 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) auto publisher_data = static_cast(publisher->data); if (publisher_data != nullptr) { + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a publisher has ridden off into the sunset - const bool del_result = PublishToken::del( - &node->context->impl->session, - GenerateToken::publisher( - node->context->actual_domain_id, - node->namespace_, - node->name, - publisher->topic_name, - publisher_data->type_support->get_name(), - "reliable" - ) - ); - if (!del_result) { - // TODO(Yadunund): Should this really return an error? - return RMW_RET_ERROR; - } + // const bool del_result = PublishToken::del( + // &node->context->impl->session, + // GenerateToken::publisher( + // node->context->actual_domain_id, + // node->namespace_, + // node->name, + // publisher->topic_name, + // publisher_data->type_support->get_name(), + // "reliable" + // ) + // ); + // if (!del_result) { + // // TODO(Yadunund): Should this really return an error? + // return RMW_RET_ERROR; + // } + // TODO(Yadunund): Fix linker error. + z_drop(z_move(publisher_data->token)); node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, );