Skip to content

Commit

Permalink
Implement EventsBase class
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Feb 8, 2024
1 parent 7e1b571 commit ef11bb2
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 229 deletions.
315 changes: 189 additions & 126 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,210 @@
#include "rmw_data_types.hpp"

///=============================================================================
saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
void EventsBase::set_user_callback(
const void * user_data, rmw_event_callback_t callback)
{
memcpy(publisher_gid, pub_gid, 16);
std::lock_guard<std::mutex> lock_mutex(event_mutex_);

if (callback_) {
// Push events arrived before setting the the executor callback.
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
callback_ = callback;
} else {
user_data_ = nullptr;
callback_ = nullptr;
}
}

///=============================================================================
void rmw_publisher_data_t::event_set_callback(
void EventsBase::trigger_user_callback()
{
// Trigger the user provided event callback if available.
std::lock_guard<std::mutex> lock_event_mutex(event_mutex_);
if (callback_ != nullptr) {
callback_(user_data_, 1);
} else {
++unread_count_;
}
}

///=============================================================================
void EventsBase::event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", event_id);
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(user_callback_data_.mutex);
std::lock_guard<std::mutex> lock(event_mutex_);

// Set the user callback data
user_callback_data_.event_callback[event_id] = callback;
user_callback_data_.event_data[event_id] = user_data;
event_callback_[event_id] = callback;
event_data_[event_id] = user_data;

if (callback && user_callback_data_.event_unread_count[event_id]) {
if (callback && event_unread_count_[event_id]) {
// Push events happened before having assigned a callback
callback(user_data, user_callback_data_.event_unread_count[event_id]);
user_callback_data_.event_unread_count[event_id] = 0;
callback(user_data, event_unread_count_[event_id]);
event_unread_count_[event_id] = 0;
}
return;
}

///=============================================================================
void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_mutex_);

if (event_callback_[event_id] != nullptr) {
event_callback_[event_id](event_data_[event_id], 1);
} else {
++event_unread_count_[event_id];
}
return;
}

///=============================================================================
bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
event_id);
return true;
}

std::lock_guard<std::mutex> lock(event_mutex_);

return event_queues_[event_id].empty();
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsBase::pop_next_event(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
event_id);
return nullptr;
}

std::lock_guard<std::mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
return nullptr;
}

std::unique_ptr<rmw_zenoh_event_status_t> event_status =
std::move(event_queues_[event_id].front());
event_queues_[event_id].pop_front();

return event_status;
}

///=============================================================================
void EventsBase::add_new_event(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Event queue depth of %ld reached, discarding oldest message "
"for event type %d",
event_queue_depth_,
event_id);

event_queue.pop_front();
}

event_queue.emplace_back(std::move(event));

// Since we added new data, trigger the event guard condition if it is available.
notify_event(event_id);
}

///=============================================================================
void EventsBase::attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = condition_variable;
}

///=============================================================================
void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = nullptr;
}

///=============================================================================
void EventsBase::notify_event(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
if (event_conditions_[event_id] != nullptr) {
event_conditions_[event_id]->notify_one();
}
}

///=============================================================================
saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
{
memcpy(publisher_gid, pub_gid, 16);
}

///=============================================================================
void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
{
Expand Down Expand Up @@ -133,65 +305,11 @@ void rmw_subscription_data_t::add_new_message(

message_queue_.emplace_back(std::move(msg));

// Trigger the user provided event callback if available.
std::unique_lock<std::mutex> lock_event_mutex(user_callback_data_.mutex);
if (user_callback_data_.callback != nullptr) {
user_callback_data_.callback(user_callback_data_.user_data, 1);
} else {
++user_callback_data_.unread_count;
}
lock_event_mutex.unlock();

// Since we added new data, trigger the guard condition if it is available
// Since we added new data, trigger user callback and guard condition if they are available
trigger_user_callback();
notify();
}

///=============================================================================
void rmw_subscription_data_t::set_on_new_message_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::mutex> lock_mutex(user_callback_data_.mutex);

if (callback) {
// Push events arrived before setting the the executor callback.
if (user_callback_data_.unread_count) {
callback(user_data, user_callback_data_.unread_count);
user_callback_data_.unread_count = 0;
}
user_callback_data_.user_data = user_data;
user_callback_data_.callback = callback;
} else {
user_callback_data_.user_data = nullptr;
user_callback_data_.callback = nullptr;
}
}

///=============================================================================
void rmw_subscription_data_t::event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", event_id);
return;
}

std::lock_guard<std::mutex> lock(user_callback_data_.mutex);

// Set the user callback data
user_callback_data_.event_callback[event_id] = callback;
user_callback_data_.event_data[event_id] = user_data;

if (callback && user_callback_data_.event_unread_count[event_id]) {
// Push events happened before having assigned a callback
callback(user_data, user_callback_data_.event_unread_count[event_id]);
user_callback_data_.event_unread_count[event_id] = 0;
}
return;
}

///=============================================================================
bool rmw_service_data_t::query_queue_is_empty() const
{
Expand Down Expand Up @@ -242,16 +360,8 @@ void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
std::lock_guard<std::mutex> lock(query_queue_mutex_);
query_queue_.emplace_back(std::move(query));

// Trigger the user provided event callback if available.
std::unique_lock<std::mutex> lock_event_mutex(user_callback_data_.mutex);
if (user_callback_data_.callback != nullptr) {
user_callback_data_.callback(user_callback_data_.user_data, 1);
} else {
++user_callback_data_.unread_count;
}
lock_event_mutex.unlock();

// Since we added new data, trigger the guard condition if it is available
// Since we added new data, trigger user callback and guard condition if they are available
trigger_user_callback();
notify();
}

Expand Down Expand Up @@ -284,26 +394,6 @@ std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(int64_t sequ
return query;
}

///=============================================================================
void rmw_service_data_t::set_on_new_request_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::mutex> lock_mutex(user_callback_data_.mutex);

if (callback) {
// Push events arrived before setting the the executor callback.
if (user_callback_data_.unread_count) {
callback(user_data, user_callback_data_.unread_count);
user_callback_data_.unread_count = 0;
}
user_callback_data_.user_data = user_data;
user_callback_data_.callback = callback;
} else {
user_callback_data_.user_data = nullptr;
user_callback_data_.callback = nullptr;
}
}

///=============================================================================
void rmw_client_data_t::notify()
{
Expand All @@ -319,15 +409,8 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
std::lock_guard<std::mutex> lock(reply_queue_mutex_);
reply_queue_.emplace_back(std::move(reply));

// Trigger the user provided event callback if available.
std::unique_lock<std::mutex> lock_event_mutex(user_callback_data_.mutex);
if (user_callback_data_.callback != nullptr) {
user_callback_data_.callback(user_callback_data_.user_data, 1);
} else {
++user_callback_data_.unread_count;
}
lock_event_mutex.unlock();

// Since we added new data, trigger user callback and guard condition if they are available
trigger_user_callback();
notify();
}

Expand Down Expand Up @@ -368,26 +451,6 @@ std::unique_ptr<ZenohReply> rmw_client_data_t::pop_next_reply()
return latest_reply;
}

///=============================================================================
void rmw_client_data_t::set_on_new_response_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::mutex> lock_mutex(user_callback_data_.mutex);

if (callback) {
// Push events arrived before setting the the executor callback.
if (user_callback_data_.unread_count) {
callback(user_data, user_callback_data_.unread_count);
user_callback_data_.unread_count = 0;
}
user_callback_data_.user_data = user_data;
user_callback_data_.callback = callback;
} else {
user_callback_data_.user_data = nullptr;
user_callback_data_.callback = nullptr;
}
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
Expand Down
Loading

0 comments on commit ef11bb2

Please sign in to comment.