Skip to content

Commit

Permalink
Merge branch 'ros2:rolling' into rolling
Browse files Browse the repository at this point in the history
  • Loading branch information
imstevenpmwork authored Sep 29, 2024
2 parents da4a9e4 + 67ed661 commit 66997d6
Show file tree
Hide file tree
Showing 15 changed files with 680 additions and 450 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/logging.cpp
src/detail/message_type_support.cpp
src/detail/qos.cpp
src/detail/rmw_context_impl_s.cpp
src/detail/rmw_data_types.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct rmw_zenoh_event_status_t
size_t total_count;
size_t total_count_change;
size_t current_count;
size_t current_count_change;
int32_t current_count_change;
// The data field can be used to store serialized information for more complex statuses.
std::string data;

Expand Down Expand Up @@ -97,7 +97,7 @@ class DataCallbackManager
size_t unread_count_ {0};
};

/// Base class to be inherited by entities that support events.
/// A class to manage QoS related events.
class EventsManager
{
public:
Expand Down
55 changes: 34 additions & 21 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,20 @@ void GraphCache::handle_matched_events_for_put(
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
if (is_pub) {
// Count the number of matching subs for each set of qos settings.
if (!topic_data_ptr->subs_.empty()) {
match_count_for_entity += topic_data_ptr->subs_.size();
}
match_count_for_entity += topic_data_ptr->subs_.size();
// Also iterate through the subs to check if any are local and if update event counters.
for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
// Update counters only if key expressions match.
if (entity->topic_info()->topic_keyexpr_ ==
sub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
}
}
}
// Update event counters for the new entity->
Expand All @@ -238,17 +241,20 @@ void GraphCache::handle_matched_events_for_put(
} else {
// Entity is a sub.
// Count the number of matching pubs for each set of qos settings.
if (!topic_data_ptr->pubs_.empty()) {
match_count_for_entity += topic_data_ptr->pubs_.size();
}
match_count_for_entity += topic_data_ptr->pubs_.size();
// Also iterate through the pubs to check if any are local and if update event counters.
for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
// Update counters only if key expressions match.
if (entity->topic_info()->topic_keyexpr_ ==
pub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
}
}
}
// Update event counters for the new entity->
Expand Down Expand Up @@ -308,7 +314,7 @@ void GraphCache::handle_matched_events_for_del(
}

///=============================================================================
void GraphCache::take_entities_with_events(EntityEventMap & entities_with_events)
void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_events)
{
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
Expand Down Expand Up @@ -1260,6 +1266,13 @@ void GraphCache::set_qos_event_callback(
event_cb_it->second[event_type] = std::move(callback);
}

///=============================================================================
void GraphCache::remove_qos_event_callbacks(liveliness::ConstEntityPtr entity)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
event_callbacks_.erase(entity);
}

///=============================================================================
bool GraphCache::is_entity_local(const liveliness::Entity & entity) const
{
Expand Down Expand Up @@ -1300,8 +1313,8 @@ void GraphCache::update_event_counters(
}

rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id];
status_to_update.total_count += std::abs(change);
status_to_update.total_count_change += std::abs(change);
status_to_update.total_count += std::max(0, change);
status_to_update.total_count_change += std::max(0, change);
status_to_update.current_count += change;
status_to_update.current_count_change = change;
}
Expand Down
5 changes: 4 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class GraphCache final
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback);

/// Remove all qos event callbacks for an entity.
void remove_qos_event_callbacks(liveliness::ConstEntityPtr entity);

/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);

Expand Down Expand Up @@ -248,7 +251,7 @@ class GraphCache final

using EntityEventMap =
std::unordered_map<liveliness::ConstEntityPtr, std::unordered_set<rmw_zenoh_event_type_t>>;
void take_entities_with_events(EntityEventMap & entities_with_events);
void take_entities_with_events(const EntityEventMap & entities_with_events);

std::string zid_str_;
/*
Expand Down
Loading

0 comments on commit 66997d6

Please sign in to comment.