Skip to content

Commit

Permalink
Move eventbase definition into event.hpp
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Feb 9, 2024
1 parent d2398e6 commit 586af20
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 293 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/event.cpp
src/detail/identifier.cpp
src/detail/graph_cache.cpp
src/detail/guard_condition.cpp
Expand Down
229 changes: 229 additions & 0 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <utility>

#include "event.hpp"

#include "rcutils/logging_macros.h"

#include "rmw/error_handling.h"


///=============================================================================
void EventsBase::set_user_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);

if (callback) {
// Push events arrived before setting the the executor callback.
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
callback_ = callback;
} else {
user_data_ = nullptr;
callback_ = nullptr;
}
}

///=============================================================================
void EventsBase::trigger_user_callback()
{
// Trigger the user provided event callback if available.
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);
if (callback_ != nullptr) {
callback_(user_data_, 1);
} else {
++unread_count_;
}
}

///=============================================================================
void EventsBase::event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

// Set the user callback data
event_callback_[event_id] = callback;
event_data_[event_id] = user_data;

if (callback && event_unread_count_[event_id]) {
// Push events happened before having assigned a callback
callback(user_data, event_unread_count_[event_id]);
event_unread_count_[event_id] = 0;
}
return;
}

///=============================================================================
void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

if (event_callback_[event_id] != nullptr) {
event_callback_[event_id](event_data_[event_id], 1);
} else {
++event_unread_count_[event_id];
}
return;
}

///=============================================================================
bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return true;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

return event_queues_[event_id].empty();
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsBase::pop_next_event(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return nullptr;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
return nullptr;
}

std::unique_ptr<rmw_zenoh_event_status_t> event_status =
std::move(event_queues_[event_id].front());
event_queues_[event_id].pop_front();

return event_status;
}

///=============================================================================
void EventsBase::add_new_event(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Event queue depth of %ld reached, discarding oldest message "
"for event type %d",
event_queue_depth_,
event_id);

event_queue.pop_front();
}

event_queue.emplace_back(std::move(event));

// Since we added new data, trigger event callback and guard condition if they are available
trigger_event_callback(event_id);
notify_event(event_id);
}

///=============================================================================
void EventsBase::attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = condition_variable;
}

///=============================================================================
void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = nullptr;
}

///=============================================================================
void EventsBase::notify_event(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
if (event_conditions_[event_id] != nullptr) {
event_conditions_[event_id]->notify_one();
}
}
94 changes: 94 additions & 0 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@
#ifndef DETAIL__EVENT_HPP_
#define DETAIL__EVENT_HPP_

#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

#include "rmw/event.h"
#include "rmw/event_callback_type.h"


///=============================================================================
// A struct that represents an event status in rmw_zenoh.
Expand Down Expand Up @@ -53,4 +60,91 @@ static const std::unordered_map<rmw_event_type_t, rmw_zenoh_event_type_t> event_
// TODO(clalancette): Implement remaining events
};

///=============================================================================
/// A struct to store status changes which can be mapped to rmw event statuses.
/// The data field can be used to store serialized information for more complex statuses.
struct rmw_zenoh_event_status_t
{
size_t total_count;
size_t total_count_change;
size_t current_count;
std::string data;
};

///=============================================================================
/// Base class to be inherited by entities that support events.
class EventsBase
{
public:
/// @brief Set the user defined callback that should be called when
/// a new message/response/request is received.
/// @param user_data the data that should be passed to the callback.
/// @param callback the callback to be set.
void set_user_callback(const void * user_data, rmw_event_callback_t callback);

/// Trigger the user callback.
void trigger_user_callback();

/// @brief Set the callback to be triggered when the relevant event is triggered.
/// @param event_id the id of the event
/// @param callback the callback to trigger for this event.
/// @param user_data the data to be passed to the callback.
void event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data);

/// @brief Returns true if the event queue is empty.
/// @param event_id the event id whose event queue should be checked.
bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const;

/// Pop the next event in the queue.
/// @param event_id the event id whose queue should be popped.
std::unique_ptr<rmw_zenoh_event_status_t> pop_next_event(
rmw_zenoh_event_type_t event_id);

/// Add an event status for an event.
/// @param event_id the event id queue to which the status should be added.
void add_new_event(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event);

/// @brief Attach the condition variable provided by rmw_wait.
/// @param condition_variable to attach.
void attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable);

/// @brief Detach the condition variable provided by rmw_wait.
void detach_event_condition(rmw_zenoh_event_type_t event_id);

private:
/// @brief Trigger the callback for an event.
/// @param event_id the event id whose callback should be triggered.
void trigger_event_callback(rmw_zenoh_event_type_t event_id);

/// Notify once event is added to an event queue.
void notify_event(rmw_zenoh_event_type_t event_id);

/// Mutex to lock when read/writing members.
// The mutex is recursive as add_new_event() invokes `trigger_event_callback()`.
mutable std::recursive_mutex event_mutex_;
/// Mutex to lock for event_condition.
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// User callback that can be set via set_user_callback().
rmw_event_callback_t callback_ {nullptr};
/// User data that should be passed to the user callback.
const void * user_data_ {nullptr};
/// Count for
size_t unread_count_ {0};
rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0};
// A dequeue of events for each type of event this RMW supports.
std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> event_queues_[ZENOH_EVENT_ID_MAX + 1] {};
const std::size_t event_queue_depth_ = 10;
};

#endif // DETAIL__EVENT_HPP_
Loading

0 comments on commit 586af20

Please sign in to comment.