Skip to content

Commit

Permalink
Allow event statuses to be taken anytime
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Dec 28, 2024
1 parent e7493d3 commit 937d33b
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 185 deletions.
66 changes: 33 additions & 33 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <deque>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <unordered_map>
#include <utility>

Expand Down Expand Up @@ -52,6 +54,18 @@ rmw_zenoh_event_type_t zenoh_event_from_rmw_event(rmw_event_type_t rmw_event_typ
return ZENOH_EVENT_INVALID;
}

///=============================================================================
rmw_zenoh_event_status_t::rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0),
data(std::string()),
changed(false)
{
// Do nothing.
}

///=============================================================================
void DataCallbackManager::set_callback(
const void * user_data, rmw_event_callback_t callback)
Expand Down Expand Up @@ -134,35 +148,30 @@ void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
rmw_zenoh_event_status_t EventsManager::take_event_status(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return nullptr;
throw std::runtime_error("Invalid event_type");
}

std::lock_guard<std::mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
return nullptr;
}

std::unique_ptr<rmw_zenoh_event_status_t> event_status =
std::move(event_queues_[event_id].front());
event_queues_[event_id].pop_front();

return event_status;
// Create a copy to return before resetting counters.
auto ret = event_statuses_[event_id];
event_statuses_[event_id].current_count_change = 0;
event_statuses_[event_id].total_count_change = 0;
event_statuses_[event_id].changed = false;
return ret;
}

///=============================================================================
void EventsManager::add_new_event(
void EventsManager::update_event_status(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
int32_t current_count_change)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -174,24 +183,15 @@ void EventsManager::add_new_event(

{
std::lock_guard<std::mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
// Log warning if message is discarded due to hitting the queue depth
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Event queue depth of %ld reached, discarding oldest message "
"for event type %d",
event_queue_depth_,
event_id);

event_queue.pop_front();
}

event_queue.emplace_back(std::move(event));
rmw_zenoh_event_status_t & status_to_update = event_statuses_[event_id];
status_to_update.total_count += std::max(0, current_count_change);
status_to_update.total_count_change += std::max(0, current_count_change);
status_to_update.current_count += current_count_change;
status_to_update.current_count_change = current_count_change;
status_to_update.changed = true;
}

// Since we added new data, trigger event callback and guard condition if they are available
// Since we updated data, trigger event callback and guard condition if they are available
trigger_event_callback(event_id);
notify_event(event_id);
}
Expand All @@ -211,7 +211,7 @@ bool EventsManager::queue_has_data_and_attach_condition_if_not(

std::lock_guard<std::mutex> lock(event_condition_mutex_);

if (!event_queues_[event_id].empty()) {
if (event_statuses_[event_id].changed) {
return true;
}

Expand All @@ -235,7 +235,7 @@ bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_ty

wait_set_data_[event_id] = nullptr;

return event_queues_[event_id].empty();
return !event_statuses_[event_id].changed;
}

///=============================================================================
Expand Down
31 changes: 14 additions & 17 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ struct rmw_zenoh_event_status_t
int32_t current_count_change;
// The data field can be used to store serialized information for more complex statuses.
std::string data;
// A boolean field to indicate if the status changed since the last take.
bool changed;

rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0)
{}
// Constructor.
rmw_zenoh_event_status_t();
};

///=============================================================================
Expand Down Expand Up @@ -110,16 +108,16 @@ class EventsManager
rmw_event_callback_t callback,
const void * user_data);

/// Pop the next event in the queue.
/// @param event_id the event id whose queue should be popped.
std::unique_ptr<rmw_zenoh_event_status_t> pop_next_event(
rmw_zenoh_event_type_t event_id);
/// @brief Get the status for an event.
/// @param event_id the id for the event whose status should be retrieved.
rmw_zenoh_event_status_t take_event_status(rmw_zenoh_event_type_t event_id);

/// Add an event status for an event.
/// @param event_id the event id queue to which the status should be added.
void add_new_event(
/// @brief Update the status for an event.
/// @param event_id the id for the event whose status should be changed.
/// @param current_count_change the change in the current count.
void update_event_status(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event);
int32_t current_count_change);

/// @brief Attach the condition variable provided by rmw_wait.
/// @param condition_variable to attach.
Expand Down Expand Up @@ -148,9 +146,8 @@ class EventsManager
rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0};
// A dequeue of events for each type of event this RMW supports.
std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> event_queues_[ZENOH_EVENT_ID_MAX + 1] {};
const std::size_t event_queue_depth_ = 10;
// Statuses for events supported.
rmw_zenoh_event_status_t event_statuses_[ZENOH_EVENT_ID_MAX + 1];
};
} // namespace rmw_zenoh_cpp

Expand Down
121 changes: 31 additions & 90 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <array>
#include <functional>
#include <limits>
Expand Down Expand Up @@ -207,8 +206,6 @@ void GraphCache::handle_matched_events_for_put(
}
const liveliness::TopicInfo topic_info = entity->topic_info().value();
const bool is_pub = is_entity_pub(*entity);
// Initialize a map that will be populated with any QoS events that may be detected.
EntityEventMap local_entities_with_events = {};
// The entity added may be local with callbacks registered but there
// may be other local entities in the graph that are matched.
int32_t match_count_for_entity = 0;
Expand All @@ -229,18 +226,16 @@ void GraphCache::handle_matched_events_for_put(
if (entity->topic_info()->topic_keyexpr_ ==
sub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(topic_info.name_, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1);
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
update_event_counters(entity, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1);
}
}
}
// Update event counters for the new entity->
update_event_counters(topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
match_count_for_entity);
// Update event counters for the new entity.
if (is_entity_local(*entity) && match_count_for_entity > 0) {
local_entities_with_events[entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
update_event_counters(entity,
ZENOH_EVENT_PUBLICATION_MATCHED,
match_count_for_entity);
}
} else {
// Entity is a sub.
Expand All @@ -259,23 +254,20 @@ void GraphCache::handle_matched_events_for_put(
if (entity->topic_info()->topic_keyexpr_ ==
pub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(topic_info.name_, ZENOH_EVENT_PUBLICATION_MATCHED, 1);
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
update_event_counters(pub_entity, ZENOH_EVENT_PUBLICATION_MATCHED, 1);
}
}
}
// Update event counters for the new entity->
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
match_count_for_entity);
// Update event counters for the new entity.
if (is_entity_local(*entity) && match_count_for_entity > 0) {
local_entities_with_events[entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
update_event_counters(
entity,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
match_count_for_entity);
}
}
}
take_entities_with_events(local_entities_with_events);
}

///=============================================================================
Expand All @@ -290,52 +282,27 @@ void GraphCache::handle_matched_events_for_del(
return;
}
const liveliness::TopicInfo topic_info = entity->topic_info().value();
EntityEventMap local_entities_with_events;
if (is_entity_pub(*entity)) {
// Notify any local subs of a matched event with change -1.
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(-1));
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
update_event_counters(
sub_entity,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(-1));
}
}
}
} else {
// Notify any local pubs of a matched event with change -1.
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(-1));
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
}
}
}
}
take_entities_with_events(local_entities_with_events);
}

///=============================================================================
void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_events)
{
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(local_entity->keyexpr_hash());
if (event_callbacks_it != event_callbacks_.end()) {
for (const rmw_zenoh_event_type_t & event_type : event_set) {
GraphEventCallbacks::const_iterator callback_it =
event_callbacks_it->second.find(event_type);
if (callback_it != event_callbacks_it->second.end()) {
std::unique_ptr<rmw_zenoh_event_status_t> taken_event =
take_event_status(local_entity->topic_info()->name_, event_type);
callback_it->second(std::move(taken_event));
update_event_counters(
pub_entity,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(-1));
}
}
}
Expand Down Expand Up @@ -1218,7 +1185,7 @@ void GraphCache::set_qos_event_callback(
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
std::lock_guard<std::mutex> lock(events_mutex_);

if (event_type > ZENOH_EVENT_ID_MAX) {
RMW_ZENOH_LOG_WARN_NAMED(
Expand All @@ -1239,7 +1206,7 @@ void GraphCache::set_qos_event_callback(
///=============================================================================
void GraphCache::remove_qos_event_callbacks(std::size_t entity_keyexpr_hash)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
std::lock_guard<std::mutex> lock(events_mutex_);
event_callbacks_.erase(entity_keyexpr_hash);
}

Expand All @@ -1265,7 +1232,7 @@ bool GraphCache::is_entity_pub(const liveliness::Entity & entity)

///=============================================================================
void GraphCache::update_event_counters(
const std::string & topic_name,
liveliness::ConstEntityPtr entity,
const rmw_zenoh_event_type_t event_id,
int32_t change)
{
Expand All @@ -1275,42 +1242,16 @@ void GraphCache::update_event_counters(

std::lock_guard<std::mutex> lock(events_mutex_);

auto event_statuses_it = event_statuses_.find(topic_name);
if (event_statuses_it == event_statuses_.end()) {
// Initialize statuses.
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1> status_array {};
event_statuses_[topic_name] = std::move(status_array);
}

rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id];
status_to_update.total_count += std::max(0, change);
status_to_update.total_count_change += std::max(0, change);
status_to_update.current_count += change;
status_to_update.current_count_change = change;
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(
const std::string & topic_name,
const rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
return nullptr;
}

std::lock_guard<std::mutex> lock(events_mutex_);

auto event_statuses_it = event_statuses_.find(topic_name);
if (event_statuses_it == event_statuses_.end()) {
return nullptr;
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(entity->keyexpr_hash());
if (event_callbacks_it != event_callbacks_.end()) {
GraphEventCallbacks::const_iterator callback_it =
event_callbacks_it->second.find(event_id);
if (callback_it != event_callbacks_it->second.end()) {
callback_it->second(change);
}
}

rmw_zenoh_event_status_t & status_to_take = event_statuses_[topic_name][event_id];
auto result = std::make_unique<rmw_zenoh_event_status_t>(status_to_take);
// Reset changes.
status_to_take.total_count_change = 0;
status_to_take.current_count_change = 0;
return result;
}

///=============================================================================
Expand Down
Loading

0 comments on commit 937d33b

Please sign in to comment.