diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d2145006..4ecfe360 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -113,10 +113,11 @@ install( ) add_executable(rmw_zenohd - src/zenohd/main.cpp - src/detail/zenoh_config.cpp src/detail/liveliness_utils.cpp src/detail/logging.cpp + src/detail/qos.cpp + src/detail/zenoh_config.cpp + src/zenohd/main.cpp ) target_link_libraries(rmw_zenohd diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 00e78be6..e27acef6 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -341,7 +341,7 @@ void GraphCache::parse_put( if (ignore_from_current_session && is_entity_local(*entity)) { RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", - "Ignoring parse_put for %s from the same session.\n", entity->keyexpr().c_str()); + "Ignoring parse_put for %s from the same session.\n", entity->liveliness_keyexpr().c_str()); return; } @@ -403,6 +403,20 @@ void GraphCache::parse_put( // Otherwise, the entity represents a node that already exists in the graph. // Update topic info if required below. update_topic_maps_for_put(node_it->second, entity); + + // If the newly added entity is a publisher with transient_local qos durability, + // we trigger any registered querying subscriber callbacks. + if (entity->type() == liveliness::EntityType::Publisher && + entity->topic_info().has_value() && + entity->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) + { + auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_); + if (sub_cbs_it != querying_subs_cbs_.end()) { + for (const auto & cb : sub_cbs_it->second) { + cb(entity->zid()); + } + } + } } ///============================================================================= @@ -559,7 +573,7 @@ void GraphCache::parse_del( if (ignore_from_current_session && is_entity_local(*entity)) { RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", - "Ignoring parse_del for %s from the same session.\n", entity->keyexpr().c_str()); + "Ignoring parse_del for %s from the same session.\n", entity->liveliness_keyexpr().c_str()); return; } // Lock the graph mutex before accessing the graph. @@ -1315,4 +1329,18 @@ std::unique_ptr GraphCache::take_event_status( status_to_take.current_count_change = 0; return result; } + +///============================================================================= +void GraphCache::set_querying_subscriber_callback( + const std::string & keyexpr, + QueryingSubscriberCallback cb) +{ + auto cb_it = querying_subs_cbs_.find(keyexpr); + if (cb_it == querying_subs_cbs_.end()) { + querying_subs_cbs_[keyexpr] = std::move(std::vector{}); + cb_it = querying_subs_cbs_.find(keyexpr); + } + cb_it->second.push_back(std::move(cb)); +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 8d2b6802..664f2461 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -99,6 +99,12 @@ using GraphNodePtr = std::shared_ptr; class GraphCache final { public: + /// @brief Signature for a function that will be invoked by the GraphCache when a QoS + /// event is detected. + using GraphCacheEventCallback = std::function)>; + /// Callback to be triggered when a publication cache is detected in the ROS Graph. + using QueryingSubscriberCallback = std::function; + /// @brief Constructor /// @param id The id of the zenoh session that is building the graph cache. /// This is used to infer which entities originated from the current session @@ -169,10 +175,6 @@ class GraphCache final const char * service_type, bool * is_available) const; - /// @brief Signature for a function that will be invoked by the GraphCache when a QoS - /// event is detected. - using GraphCacheEventCallback = std::function)>; - /// Set a qos event callback for an entity from the current session. /// @note The callback will be removed when the entity is removed from the graph. void set_qos_event_callback( @@ -183,6 +185,10 @@ class GraphCache final /// Returns true if the entity is a publisher or client. False otherwise. static bool is_entity_pub(const liveliness::Entity & entity); + void set_querying_subscriber_callback( + const std::string & keyexpr, + QueryingSubscriberCallback cb); + private: // Helper function to convert an Entity into a GraphNode. // Note: this will update bookkeeping variables in GraphCache. @@ -281,6 +287,8 @@ class GraphCache final using GraphEventCallbackMap = std::unordered_map; // EventCallbackMap for each type of event we support in rmw_zenoh_cpp. GraphEventCallbackMap event_callbacks_; + // Map keyexpressions to QueryingSubscriberCallback. + std::unordered_map> querying_subs_cbs_; // Counters to track changes to event statues for each topic. std::unordered_map> event_statuses_; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index acd71e58..5b86d073 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -19,11 +19,13 @@ #include #include #include +#include #include #include #include #include "logging_macros.hpp" +#include "qos.hpp" #include "rcpputils/scope_exit.hpp" @@ -47,8 +49,27 @@ NodeInfo::NodeInfo( // Do nothing. } +namespace +{ +// Helper function to create a copy of a string after removing any +// leading or trailing slashes. +std::string strip_slashes(const std::string & str) +{ + std::string ret = str; + std::size_t start = 0; + std::size_t end = str.length() - 1; + if (str[0] == '/') { + ++start; + } + if (str[end] == '/') { + --end; + } + return ret.substr(start, end - start + 1); +} +} // namespace ///============================================================================= TopicInfo::TopicInfo( + std::size_t domain_id, std::string name, std::string type, std::string type_hash, @@ -58,7 +79,13 @@ TopicInfo::TopicInfo( type_hash_(std::move(type_hash)), qos_(std::move(qos)) { - // Do nothing. + topic_keyexpr_ = std::to_string(domain_id); + topic_keyexpr_ += "/"; + topic_keyexpr_ += strip_slashes(name_); + topic_keyexpr_ += "/"; + topic_keyexpr_ += type_; + topic_keyexpr_ += "/"; + topic_keyexpr_ += type_hash_; } ///============================================================================= @@ -167,40 +194,102 @@ std::vector split_keyexpr( result.push_back(keyexpr.substr(start)); return result; } + +///============================================================================= +// Helper function to convert string to size_t. +// The function is templated to enable conversion to size_t or std::size_t. +template +std::optional str_to_size_t(const std::string & str, const T default_value) +{ + if (str.empty()) { + return default_value; + } + errno = 0; + char * endptr; + // TODO(Yadunund): strtoul returns an unsigned long, not size_t. + // Depending on the architecture and platform, these may not be the same size. + // Further, if the incoming str is a signed integer, storing it in a size_t is incorrect. + // We should fix this piece of code to deal with both of those situations. + size_t num = strtoul(str.c_str(), &endptr, 10); + if (endptr == str.c_str()) { + // No values were converted, this is an error + RMW_SET_ERROR_MSG("no valid numbers available"); + return std::nullopt; + } else if (*endptr != '\0') { + // There was junk after the number + RMW_SET_ERROR_MSG("non-numeric values"); + return std::nullopt; + } else if (errno != 0) { + // Some other error occurred, which may include overflow or underflow + RMW_SET_ERROR_MSG( + "an undefined error occurred while getting the number, this may be an overflow"); + return std::nullopt; + } + return num; +} } // namespace ///============================================================================= // TODO(Yadunund): Rely on maps to retrieve strings. -std::string qos_to_keyexpr(rmw_qos_profile_t qos) +std::string qos_to_keyexpr(const rmw_qos_profile_t & qos) { std::string keyexpr = ""; + const rmw_qos_profile_t & default_qos = QoS::get().default_qos(); + // Reliability. - keyexpr += std::to_string(qos.reliability); + if (qos.reliability != default_qos.reliability) { + keyexpr += std::to_string(qos.reliability); + } keyexpr += QOS_DELIMITER; + // Durability. - keyexpr += std::to_string(qos.durability); + if (qos.durability != default_qos.durability) { + keyexpr += std::to_string(qos.durability); + } keyexpr += QOS_DELIMITER; + // History. - keyexpr += std::to_string(qos.history); + if (qos.history != default_qos.history) { + keyexpr += std::to_string(qos.history); + } keyexpr += QOS_COMPONENT_DELIMITER; - keyexpr += std::to_string(qos.depth); + if (qos.depth != default_qos.depth) { + keyexpr += std::to_string(qos.depth); + } keyexpr += QOS_DELIMITER; + // Deadline. - keyexpr += std::to_string(qos.deadline.sec); + if (qos.deadline.sec != default_qos.deadline.sec) { + keyexpr += std::to_string(qos.deadline.sec); + } keyexpr += QOS_COMPONENT_DELIMITER; - keyexpr += std::to_string(qos.deadline.nsec); + if (qos.deadline.nsec != default_qos.deadline.nsec) { + keyexpr += std::to_string(qos.deadline.nsec); + } keyexpr += QOS_DELIMITER; + // Lifespan. - keyexpr += std::to_string(qos.lifespan.sec); + if (qos.lifespan.sec != default_qos.lifespan.sec) { + keyexpr += std::to_string(qos.lifespan.sec); + } keyexpr += QOS_COMPONENT_DELIMITER; - keyexpr += std::to_string(qos.lifespan.nsec); + if (qos.lifespan.nsec != default_qos.lifespan.nsec) { + keyexpr += std::to_string(qos.lifespan.nsec); + } keyexpr += QOS_DELIMITER; + // Liveliness. - keyexpr += std::to_string(qos.liveliness); + if (qos.liveliness != default_qos.liveliness) { + keyexpr += std::to_string(qos.liveliness); + } keyexpr += QOS_COMPONENT_DELIMITER; - keyexpr += std::to_string(qos.liveliness_lease_duration.sec); + if (qos.liveliness_lease_duration.sec != default_qos.liveliness_lease_duration.sec) { + keyexpr += std::to_string(qos.liveliness_lease_duration.sec); + } keyexpr += QOS_COMPONENT_DELIMITER; - keyexpr += std::to_string(qos.liveliness_lease_duration.nsec); + if (qos.liveliness_lease_duration.nsec != default_qos.liveliness_lease_duration.nsec) { + keyexpr += std::to_string(qos.liveliness_lease_duration.nsec); + } return keyexpr; } @@ -208,7 +297,9 @@ std::string qos_to_keyexpr(rmw_qos_profile_t qos) ///============================================================================= std::optional keyexpr_to_qos(const std::string & keyexpr) { + const rmw_qos_profile_t & default_qos = QoS::get().default_qos(); rmw_qos_profile_t qos; + const std::vector parts = split_keyexpr(keyexpr, QOS_DELIMITER); if (parts.size() < 6) { return std::nullopt; @@ -232,46 +323,29 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) } try { - qos.history = str_to_qos_history.at(history_parts[0]); - qos.reliability = str_to_qos_reliability.at(parts[0]); - qos.durability = str_to_qos_durability.at(parts[1]); - qos.liveliness = str_to_qos_liveliness.at(liveliness_parts[0]); + qos.history = history_parts[0].empty() ? default_qos.history : str_to_qos_history.at( + history_parts[0]); + qos.reliability = parts[0].empty() ? default_qos.reliability : str_to_qos_reliability.at( + parts[0]); + qos.durability = parts[1].empty() ? default_qos.durability : str_to_qos_durability.at(parts[1]); + qos.liveliness = + liveliness_parts[0].empty() ? default_qos.liveliness : str_to_qos_liveliness.at( + liveliness_parts[0]); } catch (const std::exception & e) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Error setting QoS values from strings: %s", e.what()); return std::nullopt; } - - // Helper function to convert string to size_t. - auto str_to_size_t = - [](const std::string & str) -> std::optional - { - errno = 0; - char * endptr; - size_t num = strtoul(str.c_str(), &endptr, 10); - if (endptr == str.c_str()) { - // No values were converted, this is an error - RMW_SET_ERROR_MSG("no valid numbers available"); - return std::nullopt; - } else if (*endptr != '\0') { - // There was junk after the number - RMW_SET_ERROR_MSG("non-numeric values"); - return std::nullopt; - } else if (errno != 0) { - // Some other error occurred, which may include overflow or underflow - RMW_SET_ERROR_MSG( - "an undefined error occurred while getting the number, this may be an overflow"); - return std::nullopt; - } - return num; - }; - - const auto maybe_depth = str_to_size_t(history_parts[1]); - const auto maybe_deadline_s = str_to_size_t(deadline_parts[0]); - const auto maybe_deadline_ns = str_to_size_t(deadline_parts[1]); - const auto maybe_lifespan_s = str_to_size_t(lifespan_parts[0]); - const auto maybe_lifespan_ns = str_to_size_t(lifespan_parts[1]); - const auto maybe_liveliness_s = str_to_size_t(liveliness_parts[1]); - const auto maybe_liveliness_ns = str_to_size_t(liveliness_parts[2]); + const auto maybe_depth = str_to_size_t(history_parts[1], default_qos.depth); + const auto maybe_deadline_s = str_to_size_t(deadline_parts[0], default_qos.deadline.sec); + const auto maybe_deadline_ns = str_to_size_t(deadline_parts[1], default_qos.deadline.nsec); + const auto maybe_lifespan_s = str_to_size_t(lifespan_parts[0], default_qos.lifespan.sec); + const auto maybe_lifespan_ns = str_to_size_t(lifespan_parts[1], default_qos.lifespan.nsec); + const auto maybe_liveliness_s = str_to_size_t( + liveliness_parts[1], + default_qos.liveliness_lease_duration.sec); + const auto maybe_liveliness_ns = str_to_size_t( + liveliness_parts[2], + default_qos.liveliness_lease_duration.nsec); if (maybe_depth == std::nullopt || maybe_deadline_s == std::nullopt || maybe_deadline_ns == std::nullopt || @@ -354,7 +428,7 @@ Entity::Entity( for (std::size_t i = 0; i < KEYEXPR_INDEX_MAX + 1; ++i) { bool last = false; if (!keyexpr_parts[i].empty()) { - this->keyexpr_ += std::move(keyexpr_parts[i]); + this->liveliness_keyexpr_ += std::move(keyexpr_parts[i]); } if (i == KEYEXPR_INDEX_MAX || keyexpr_parts[i + 1].empty()) { last = true; @@ -363,9 +437,9 @@ Entity::Entity( break; } // Append the delimiter unless it is the last component. - this->keyexpr_ += KEYEXPR_DELIMITER; + this->liveliness_keyexpr_ += KEYEXPR_DELIMITER; } - this->guid_ = std::hash{}(this->keyexpr_); + this->guid_ = std::hash{}(this->liveliness_keyexpr_); } ///============================================================================= @@ -472,6 +546,7 @@ std::shared_ptr Entity::make(const std::string & keyexpr) return nullptr; } topic_info = TopicInfo{ + domain_id, demangle_name(std::move(parts[KeyexprIndex::TopicName])), demangle_name(std::move(parts[KeyexprIndex::TopicType])), demangle_name(std::move(parts[KeyexprIndex::TopicTypeHash])), @@ -541,9 +616,9 @@ std::optional Entity::topic_info() const } ///============================================================================= -std::string Entity::keyexpr() const +std::string Entity::liveliness_keyexpr() const { - return this->keyexpr_; + return this->liveliness_keyexpr_; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 13a682e7..6fa09833 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -50,9 +50,11 @@ struct TopicInfo std::string name_; std::string type_; std::string type_hash_; + std::string topic_keyexpr_; rmw_qos_profile_t qos_; TopicInfo( + std::size_t domain_id, std::string name, std::string type, std::string type_hash, @@ -162,7 +164,7 @@ class Entity std::optional topic_info() const; /// Get the liveliness keyexpr for this entity. - std::string keyexpr() const; + std::string liveliness_keyexpr() const; // Two entities are equal if their guids are equal. bool operator==(const Entity & other) const; @@ -183,7 +185,7 @@ class Entity EntityType type_; NodeInfo node_info_; std::optional topic_info_; - std::string keyexpr_; + std::string liveliness_keyexpr_; }; ///============================================================================= @@ -218,7 +220,7 @@ std::string demangle_name(const std::string & input); * * See rmw/types.h for the values of each policy enum. */ -std::string qos_to_keyexpr(rmw_qos_profile_t qos); +std::string qos_to_keyexpr(const rmw_qos_profile_t & qos); ///============================================================================= /// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 3de285a5..d279eabc 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -58,50 +58,6 @@ namespace { -//============================================================================== -// Helper function to create a copy of a string after removing any -// leading or trailing slashes. -std::string strip_slashes(const char * const str) -{ - std::string ret = std::string(str); - const std::size_t len = strlen(str); - std::size_t start = 0; - std::size_t end = len - 1; - if (str[0] == '/') { - ++start; - } - if (str[end] == '/') { - --end; - } - return ret.substr(start, end - start + 1); -} - -//============================================================================== -// A function that generates a key expression for message transport of the format -// /// -// In particular, Zenoh keys cannot start or end with a /, so this function -// will strip them out. -// Performance note: at present, this function allocates a new string and copies -// the old string into it. If this becomes a performance problem, we could consider -// modifying the topic_name in place. But this means we need to be much more -// careful about who owns the string. -z_owned_keyexpr_t ros_topic_name_to_zenoh_key( - const std::size_t domain_id, - const char * const topic_name, - const char * const topic_type, - const char * const type_hash) -{ - std::string keyexpr_str = std::to_string(domain_id); - keyexpr_str += "/"; - keyexpr_str += strip_slashes(topic_name); - keyexpr_str += "/"; - keyexpr_str += topic_type; - keyexpr_str += "/"; - keyexpr_str += type_hash; - - return z_keyexpr_new(keyexpr_str.c_str()); -} - //============================================================================== const rosidl_message_type_support_t * find_message_type_support( const rosidl_message_type_support_t * type_supports) @@ -307,12 +263,13 @@ rmw_create_node( if (node_data->entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the node."); + "Unable to generate keyexpr for liveliness token for the node %s.", + name); return nullptr; } node_data->token = zc_liveliness_declare_token( z_loan(context->impl->session), - z_keyexpr(node_data->entity->keyexpr().c_str()), + z_keyexpr(node_data->entity->liveliness_keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -614,11 +571,32 @@ rmw_create_publisher( allocator->deallocate(type_hash_c_str, allocator->state); }); - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - node->context->actual_domain_id, - topic_name, - publisher_data->type_support->get_name(), - type_hash_c_str); + const z_id_t zid = z_info_zid(z_loan(node->context->impl->session)); + + publisher_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( + zid, + std::to_string(node_data->id), + std::to_string( + context_impl->get_next_entity_id()), + rmw_zenoh_cpp::liveliness::EntityType::Publisher, + rmw_zenoh_cpp::liveliness::NodeInfo{ + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + rmw_zenoh_cpp::liveliness::TopicInfo{ + node->context->actual_domain_id, + rmw_publisher->topic_name, + publisher_data->type_support->get_name(), + type_hash_c_str, + publisher_data->adapted_qos_profile} + ); + if (publisher_data->entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the publisher %s.", + rmw_publisher->topic_name); + return nullptr; + } + z_owned_keyexpr_t keyexpr = z_keyexpr_new( + publisher_data->entity->topic_info()->topic_keyexpr_.c_str()); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); @@ -633,6 +611,18 @@ rmw_create_publisher( ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); pub_cache_opts.history = publisher_data->adapted_qos_profile.depth; pub_cache_opts.queryable_complete = true; + // Set the queryable_prefix to the session id so that querying subscribers can specify this + // session id to obtain latest data from this specific publication caches when querying over + // the same keyexpression. + // When such a prefix is added to the PublicationCache, it listens to queries with this extra + // prefix (allowing to be queried in a unique way), but still replies with the original + // publications' key expressions. + z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(publisher_data->entity->zid().c_str()); + auto always_free_queryable_prefix = rcpputils::make_scope_exit( + [&queryable_prefix]() { + z_keyexpr_drop(z_move(queryable_prefix)); + }); + pub_cache_opts.queryable_prefix = z_loan(queryable_prefix); publisher_data->pub_cache = ze_declare_publication_cache( z_loan(context_impl->session), z_loan(keyexpr), @@ -673,29 +663,9 @@ rmw_create_publisher( z_undeclare_publisher(z_move(publisher_data->pub)); }); - publisher_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), - std::to_string(node_data->id), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Publisher, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - rmw_publisher->topic_name, - publisher_data->type_support->get_name(), - type_hash_c_str, - publisher_data->adapted_qos_profile} - ); - if (publisher_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the publisher."); - return nullptr; - } publisher_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(publisher_data->entity->keyexpr().c_str()), + z_keyexpr(publisher_data->entity->liveliness_keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -1447,12 +1417,30 @@ rmw_create_subscription( // Everything above succeeded and is setup properly. Now declare a subscriber // with Zenoh; after this, callbacks may come in at any time. + sub_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( + z_info_zid(z_loan(node->context->impl->session)), + std::to_string(node_data->id), + std::to_string( + context_impl->get_next_entity_id()), + rmw_zenoh_cpp::liveliness::EntityType::Subscription, + rmw_zenoh_cpp::liveliness::NodeInfo{ + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + rmw_zenoh_cpp::liveliness::TopicInfo{ + node->context->actual_domain_id, + rmw_subscription->topic_name, + sub_data->type_support->get_name(), + type_hash_c_str, + sub_data->adapted_qos_profile} + ); + if (sub_data->entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the subscription %s.", + rmw_subscription->topic_name); + return nullptr; + } z_owned_closure_sample_t callback = z_closure(rmw_zenoh_cpp::sub_data_handler, nullptr, sub_data); - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - node->context->actual_domain_id, - topic_name, - sub_data->type_support->get_name(), - type_hash_c_str); + z_owned_keyexpr_t keyexpr = z_keyexpr_new(sub_data->entity->topic_info()->topic_keyexpr_.c_str()); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); @@ -1473,8 +1461,18 @@ rmw_create_subscription( if (sub_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); - // Target all complete publication caches which are queryables. - sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE; + // Make the initial query to hit all the PublicationCaches, using a query_selector with + // '*' in place of the queryable_prefix of each PublicationCache + const std::string selector = "*/" + + sub_data->entity->topic_info()->topic_keyexpr_; + sub_options.query_selector = z_keyexpr(selector.c_str()); + // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. + // By default a query accepts only replies that matches its query selector. + // This allows us to selectively query certain PublicationCaches when defining the + // set_querying_subscriber_callback below. + sub_options.query_accept_replies = ZCU_REPLY_KEYEXPR_ANY; + // As this initial query is now using a "*", the query target is not COMPLETE. + sub_options.query_target = Z_QUERY_TARGET_ALL; // We set consolidation to none as we need to receive transient local messages // from a number of publishers. Eg: To receive TF data published over /tf_static // by various publishers. @@ -1492,6 +1490,34 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return nullptr; } + // Register the querying subscriber with the graph cache to get latest + // messages from publishers that were discovered after their first publication. + context_impl->graph_cache->set_querying_subscriber_callback( + sub_data->entity->topic_info()->topic_keyexpr_, + [sub_data](const std::string & queryable_prefix) -> void + { + if (sub_data == nullptr) { + return; + } + const std::string selector = queryable_prefix + + "/" + + sub_data->entity->topic_info()->topic_keyexpr_; + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "QueryingSubscriberCallback triggered over %s.", + selector.c_str() + ); + z_get_options_t opts = z_get_options_default(); + opts.timeout_ms = std::numeric_limits::max(); + opts.consolidation = z_query_consolidation_latest(); + opts.accept_replies = ZCU_REPLY_KEYEXPR_ANY; + ze_querying_subscriber_get( + z_loan(std::get(sub_data->sub)), + z_keyexpr(selector.c_str()), + &opts + ); + } + ); } else { // Create a regular subscriber for all other durability settings. z_subscriber_options_t sub_options = z_subscriber_options_default(); @@ -1524,30 +1550,10 @@ rmw_create_subscription( } }); - // Publish to the graph that a new subscription is in town - sub_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), - std::to_string(node_data->id), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Subscription, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - rmw_subscription->topic_name, - sub_data->type_support->get_name(), - type_hash_c_str, - sub_data->adapted_qos_profile} - ); - if (sub_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the subscription."); - return nullptr; - } + // Publish to the graph that a new subscription is in town. sub_data->token = zc_liveliness_declare_token( z_loan(context_impl->session), - z_keyexpr(sub_data->entity->keyexpr().c_str()), + z_keyexpr(sub_data->entity->liveliness_keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -2271,20 +2277,6 @@ rmw_create_client( allocator->deallocate(type_hash_c_str, allocator->state); }); - client_data->keyexpr = ros_topic_name_to_zenoh_key( - node->context->actual_domain_id, - rmw_client->service_name, - service_type.c_str(), - type_hash_c_str); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { - z_keyexpr_drop(z_move(client_data->keyexpr)); - }); - if (!z_keyexpr_check(&client_data->keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), std::to_string(node_data->id), @@ -2294,6 +2286,7 @@ rmw_create_client( rmw_zenoh_cpp::liveliness::NodeInfo{ node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, rmw_zenoh_cpp::liveliness::TopicInfo{ + node->context->actual_domain_id, rmw_client->service_name, std::move(service_type), type_hash_c_str, @@ -2302,12 +2295,24 @@ rmw_create_client( if (client_data->entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the client."); + "Unable to generate keyexpr for liveliness token for the client %s.", + rmw_client->service_name); + return nullptr; + } + + client_data->keyexpr = z_keyexpr_new(client_data->entity->topic_info()->topic_keyexpr_.c_str()); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [client_data]() { + z_keyexpr_drop(z_move(client_data->keyexpr)); + }); + if (!z_keyexpr_check(&client_data->keyexpr)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + client_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(client_data->entity->keyexpr().c_str()), + z_keyexpr(client_data->entity->liveliness_keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -2849,11 +2854,29 @@ rmw_create_service( allocator->deallocate(type_hash_c_str, allocator->state); }); - service_data->keyexpr = ros_topic_name_to_zenoh_key( - node->context->actual_domain_id, - rmw_service->service_name, - service_type.c_str(), - type_hash_c_str); + service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( + z_info_zid(z_loan(node->context->impl->session)), + std::to_string(node_data->id), + std::to_string( + context_impl->get_next_entity_id()), + rmw_zenoh_cpp::liveliness::EntityType::Service, + rmw_zenoh_cpp::liveliness::NodeInfo{ + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + rmw_zenoh_cpp::liveliness::TopicInfo{ + node->context->actual_domain_id, + rmw_service->service_name, + std::move(service_type), + type_hash_c_str, + service_data->adapted_qos_profile} + ); + if (service_data->entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the service %s.", + rmw_service->service_name); + return nullptr; + } + service_data->keyexpr = z_keyexpr_new(service_data->entity->topic_info()->topic_keyexpr_.c_str()); auto free_ros_keyexpr = rcpputils::make_scope_exit( [service_data]() { if (service_data) { @@ -2886,29 +2909,9 @@ rmw_create_service( z_undeclare_queryable(z_move(service_data->qable)); }); - service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), - std::to_string(node_data->id), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Service, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - rmw_service->service_name, - std::move(service_type), - type_hash_c_str, - service_data->adapted_qos_profile} - ); - if (service_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the service."); - return nullptr; - } service_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(service_data->entity->keyexpr().c_str()), + z_keyexpr(service_data->entity->liveliness_keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 7903e1e8..37817882 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -23,11 +23,13 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/shared # - https://github.com/eclipse-zenoh/zenoh/pull/1022 (fix empty messages received if payload >btach size) # - https://github.com/eclipse-zenoh/zenoh-c/pull/358 (fix debian packaging issue: https://github.com/jspricke/ros-deb-builder-action/issues/49) # - https://github.com/eclipse-zenoh/zenoh/pull/1150 (fix deadlock issue https://github.com/ros2/rmw_zenoh/issues/182) +# - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 548ee8dde0f53a58c06e68a2949949b31140c36c + VCS_VERSION 134dbfa06ca212def5fb51dd8e816734dfd8dff6 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" + "-DZENOHC_CUSTOM_TARGET=${ZENOHC_CUSTOM_TARGET}" ) ament_package()