Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Events #103

Merged
merged 40 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7e1b571
Set callbacks
Yadunund Jan 30, 2024
ef11bb2
Implement EventsBase class
Yadunund Jan 31, 2024
eef9b2e
Declare certain event types to be compatible although mechanisms are …
Yadunund Jan 31, 2024
9c79e5a
Call event callbacks once an event is queued
Yadunund Jan 31, 2024
ab592b8
Fix typo preventing setting of user callbacks
Yadunund Jan 31, 2024
9ecfd72
Resolve merge conflict with rolling
Yadunund Feb 8, 2024
d2398e6
Refactor graph cache to track qos (#107)
Yadunund Feb 9, 2024
586af20
Move eventbase definition into event.hpp
Yadunund Feb 9, 2024
cc195f8
Update rmw_zenoh_cpp/src/rmw_event.cpp
Yadunund Feb 19, 2024
ab2877d
Merge branch 'rolling' into yadu/events
Yadunund Feb 19, 2024
1f38007
Store current session id in graph cache
Yadunund Feb 20, 2024
70a6c68
Allow event callback functions to be registered with graph cache
Yadunund Feb 20, 2024
33a2160
Store liveliness::Entity in custom types
Yadunund Feb 20, 2024
f573053
Register qos event callbacks
Yadunund Feb 20, 2024
b4e645a
Remove all lambdas in graph cache
Yadunund Feb 20, 2024
395bd31
Implement matched events
Yadunund Feb 23, 2024
360cc8f
Make liveliness token keyexprs globally unique
Yadunund Feb 23, 2024
e182420
Add node id to all liveliness tokens
Yadunund Feb 26, 2024
5e3df35
Debug race condition
Yadunund Feb 26, 2024
07c27f3
Update qos compatibility check to reflect zenoh's behavior
Yadunund Feb 26, 2024
a5b5acf
Get rid of TopicStats
Yadunund Feb 26, 2024
29f3a79
Replace EventsBase inheritance with DataCallbackManager and EventsMan…
Yadunund Feb 26, 2024
534d9f1
Merge pull request #118 from ros2/yadu/qos_events
Yadunund Feb 26, 2024
e4c4ff7
Comment print statements for now. They will be deleted before merge
Yadunund Feb 26, 2024
6f647bc
Merge branch 'rolling' into yadu/events
Yadunund Feb 28, 2024
fdb1050
Merge branch 'rolling' into yadu/events
Yadunund Feb 28, 2024
0072abf
Log reason for bad reply
Yadunund Feb 28, 2024
e4e60b0
Merge branch 'rolling' into yadu/events
Yadunund Feb 29, 2024
bfd409c
Resolve merge conflicts after gid merge
Yadunund Mar 1, 2024
2054a58
Resolve merge conflicts with main
Yadunund Mar 11, 2024
d723fb6
Remove debug printf statements
Yadunund Mar 12, 2024
e51e475
event_mutex_ is a regular mutex
Yadunund Mar 12, 2024
f768e32
Update todo for TopicData
Yadunund Mar 12, 2024
00e8e1c
Construct TopicData from Entity
Yadunund Mar 12, 2024
79ec141
Restrict query and reply queue sizes
Yadunund Mar 14, 2024
61925b6
Merge branch 'rolling' into yadu/events
Yadunund Mar 21, 2024
47973be
Simplify codepaths for matched events
Yadunund Mar 21, 2024
1f507f7
Avoid copying liveliness::Entity
Yadunund Mar 21, 2024
40ad1fb
Merge branch 'rolling' into yadu/events
Yadunund Mar 21, 2024
d93ad02
Handle 0 history depth for publishers too
Yadunund Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/event.cpp
src/detail/identifier.cpp
src/detail/graph_cache.cpp
src/detail/guard_condition.cpp
Expand Down
229 changes: 229 additions & 0 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <utility>

#include "event.hpp"

#include "rcutils/logging_macros.h"

#include "rmw/error_handling.h"


///=============================================================================
void DataCallbackManager::set_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::mutex> lock_mutex(event_mutex_);
Yadunund marked this conversation as resolved.
Show resolved Hide resolved

if (callback) {
// Push events arrived before setting the the executor callback.
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
callback_ = callback;
} else {
user_data_ = nullptr;
callback_ = nullptr;
}
}

///=============================================================================
void DataCallbackManager::trigger_callback()
{
// Trigger the user provided event callback if available.
std::lock_guard<std::mutex> lock_mutex(event_mutex_);
if (callback_ != nullptr) {
callback_(user_data_, 1);
} else {
++unread_count_;
}
}

///=============================================================================
void EventsManager::event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);
Yadunund marked this conversation as resolved.
Show resolved Hide resolved

// Set the user callback data
event_callback_[event_id] = callback;
event_data_[event_id] = user_data;

if (callback && event_unread_count_[event_id]) {
// Push events happened before having assigned a callback
callback(user_data, event_unread_count_[event_id]);
event_unread_count_[event_id] = 0;
}
return;
}

///=============================================================================
void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

if (event_callback_[event_id] != nullptr) {
event_callback_[event_id](event_data_[event_id], 1);
} else {
++event_unread_count_[event_id];
}
return;
}

///=============================================================================
bool EventsManager::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return true;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

return event_queues_[event_id].empty();
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return nullptr;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
return nullptr;
}

std::unique_ptr<rmw_zenoh_event_status_t> event_status =
std::move(event_queues_[event_id].front());
event_queues_[event_id].pop_front();

return event_status;
}

///=============================================================================
void EventsManager::add_new_event(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Event queue depth of %ld reached, discarding oldest message "
"for event type %d",
event_queue_depth_,
event_id);

event_queue.pop_front();
}

event_queue.emplace_back(std::move(event));

// Since we added new data, trigger event callback and guard condition if they are available
trigger_event_callback(event_id);
notify_event(event_id);
}

///=============================================================================
void EventsManager::attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = condition_variable;
}

///=============================================================================
void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = nullptr;
}

///=============================================================================
void EventsManager::notify_event(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
if (event_conditions_[event_id] != nullptr) {
event_conditions_[event_id]->notify_one();
}
}
Loading
Loading