From 515b829609f2fb452445eeed0fd9dc939f38ca4d Mon Sep 17 00:00:00 2001 From: Yadu Date: Fri, 27 Sep 2024 06:23:39 -0700 Subject: [PATCH] Fix how total_count and total_count_change are calculated for matched events (#287) * Fix how total_count and change are calculated Signed-off-by: Yadunund * Ensure key expressions match Signed-off-by: Yadunund --------- Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/event.hpp | 4 +- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 55 +++++++++++++++--------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 5 ++- rmw_zenoh_cpp/src/rmw_event.cpp | 2 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 11 +++++ 5 files changed, 52 insertions(+), 25 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index 9710711a..f6cdb1a2 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -60,7 +60,7 @@ struct rmw_zenoh_event_status_t size_t total_count; size_t total_count_change; size_t current_count; - size_t current_count_change; + int32_t current_count_change; // The data field can be used to store serialized information for more complex statuses. std::string data; @@ -97,7 +97,7 @@ class DataCallbackManager size_t unread_count_ {0}; }; -/// Base class to be inherited by entities that support events. +/// A class to manage QoS related events. class EventsManager { public: diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 04f8fd04..71b84ab3 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -213,17 +213,20 @@ void GraphCache::handle_matched_events_for_put( for (const auto & [_, topic_data_ptr] : topic_qos_map) { if (is_pub) { // Count the number of matching subs for each set of qos settings. - if (!topic_data_ptr->subs_.empty()) { - match_count_for_entity += topic_data_ptr->subs_.size(); - } + match_count_for_entity += topic_data_ptr->subs_.size(); // Also iterate through the subs to check if any are local and if update event counters. for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) { - update_event_counters( - topic_info.name_, - ZENOH_EVENT_SUBSCRIPTION_MATCHED, - static_cast(1)); - if (is_entity_local(*sub_entity)) { - local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED); + // Update counters only if key expressions match. + if (entity->topic_info()->topic_keyexpr_ == + sub_entity->topic_info().value().topic_keyexpr_) + { + update_event_counters( + topic_info.name_, + ZENOH_EVENT_SUBSCRIPTION_MATCHED, + static_cast(1)); + if (is_entity_local(*sub_entity)) { + local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED); + } } } // Update event counters for the new entity-> @@ -237,17 +240,20 @@ void GraphCache::handle_matched_events_for_put( } else { // Entity is a sub. // Count the number of matching pubs for each set of qos settings. - if (!topic_data_ptr->pubs_.empty()) { - match_count_for_entity += topic_data_ptr->pubs_.size(); - } + match_count_for_entity += topic_data_ptr->pubs_.size(); // Also iterate through the pubs to check if any are local and if update event counters. for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) { - update_event_counters( - topic_info.name_, - ZENOH_EVENT_PUBLICATION_MATCHED, - static_cast(1)); - if (is_entity_local(*pub_entity)) { - local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED); + // Update counters only if key expressions match. + if (entity->topic_info()->topic_keyexpr_ == + pub_entity->topic_info().value().topic_keyexpr_) + { + update_event_counters( + topic_info.name_, + ZENOH_EVENT_PUBLICATION_MATCHED, + static_cast(1)); + if (is_entity_local(*pub_entity)) { + local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED); + } } } // Update event counters for the new entity-> @@ -307,7 +313,7 @@ void GraphCache::handle_matched_events_for_del( } ///============================================================================= -void GraphCache::take_entities_with_events(EntityEventMap & entities_with_events) +void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_events) { for (const auto & [local_entity, event_set] : entities_with_events) { // Trigger callback set for this entity for the event type. @@ -1262,6 +1268,13 @@ void GraphCache::set_qos_event_callback( event_cb_it->second[event_type] = std::move(callback); } +///============================================================================= +void GraphCache::remove_qos_event_callbacks(liveliness::ConstEntityPtr entity) +{ + std::lock_guard lock(graph_mutex_); + event_callbacks_.erase(entity); +} + ///============================================================================= bool GraphCache::is_entity_local(const liveliness::Entity & entity) const { @@ -1302,8 +1315,8 @@ void GraphCache::update_event_counters( } rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id]; - status_to_update.total_count += std::abs(change); - status_to_update.total_count_change += std::abs(change); + status_to_update.total_count += std::max(0, change); + status_to_update.total_count_change += std::max(0, change); status_to_update.current_count += change; status_to_update.current_count_change = change; } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index bd380402..58ab6dd9 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -185,6 +185,9 @@ class GraphCache final const rmw_zenoh_event_type_t & event_type, GraphCacheEventCallback callback); + /// Remove all qos event callbacks for an entity. + void remove_qos_event_callbacks(liveliness::ConstEntityPtr entity); + /// Returns true if the entity is a publisher or client. False otherwise. static bool is_entity_pub(const liveliness::Entity & entity); @@ -246,7 +249,7 @@ class GraphCache final using EntityEventMap = std::unordered_map>; - void take_entities_with_events(EntityEventMap & entities_with_events); + void take_entities_with_events(const EntityEventMap & entities_with_events); std::string zid_str_; /* diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index deec2e4f..3cb2b14a 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -158,7 +158,7 @@ rmw_event_set_callback( return RMW_RET_ERROR; } - // Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase. + // Both rmw_subscription_data_t and rmw_publisher_data_t store an EventsManager object. rmw_zenoh_cpp::EventsManager * event_data = static_cast(rmw_event->data); RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 06355e34..2527b47f 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -689,6 +689,10 @@ rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) { RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( @@ -718,6 +722,10 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) RMW_SET_ERROR_MSG("failed to undeclare pub"); ret = RMW_RET_ERROR; } + + // Remove any event callbacks registered to this publisher. + context_impl->graph_cache->remove_qos_event_callbacks(publisher_data->entity); + RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, ); allocator->deallocate(publisher_data, allocator->state); } @@ -1584,6 +1592,9 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) context_impl->graph_cache->remove_querying_subscriber_callback(sub_data); } + // Remove any event callbacks registered to this subscription. + context_impl->graph_cache->remove_qos_event_callbacks(sub_data->entity); + RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); allocator->deallocate(sub_data, allocator->state); }