diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index b62cccc4..7c1d0192 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -26,6 +26,7 @@ find_package(zenoh_c_vendor REQUIRED) find_package(zenohc REQUIRED) add_library(rmw_zenoh_cpp SHARED + src/detail/event.cpp src/detail/identifier.cpp src/detail/graph_cache.cpp src/detail/guard_condition.cpp diff --git a/rmw_zenoh_cpp/src/detail/event.cpp b/rmw_zenoh_cpp/src/detail/event.cpp new file mode 100644 index 00000000..4b99e419 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/event.cpp @@ -0,0 +1,229 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "event.hpp" + +#include "rcutils/logging_macros.h" + +#include "rmw/error_handling.h" + + +///============================================================================= +void EventsBase::set_user_callback( + const void * user_data, rmw_event_callback_t callback) +{ + std::lock_guard lock_mutex(event_mutex_); + + if (callback) { + // Push events arrived before setting the the executor callback. + if (unread_count_) { + callback(user_data, unread_count_); + unread_count_ = 0; + } + user_data_ = user_data; + callback_ = callback; + } else { + user_data_ = nullptr; + callback_ = nullptr; + } +} + +///============================================================================= +void EventsBase::trigger_user_callback() +{ + // Trigger the user provided event callback if available. + std::lock_guard lock_mutex(event_mutex_); + if (callback_ != nullptr) { + callback_(user_data_, 1); + } else { + ++unread_count_; + } +} + +///============================================================================= +void EventsBase::event_set_callback( + rmw_zenoh_event_type_t event_id, + rmw_event_callback_t callback, + const void * user_data) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_mutex_); + + // Set the user callback data + event_callback_[event_id] = callback; + event_data_[event_id] = user_data; + + if (callback && event_unread_count_[event_id]) { + // Push events happened before having assigned a callback + callback(user_data, event_unread_count_[event_id]); + event_unread_count_[event_id] = 0; + } + return; +} + +///============================================================================= +void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_mutex_); + + if (event_callback_[event_id] != nullptr) { + event_callback_[event_id](event_data_[event_id], 1); + } else { + ++event_unread_count_[event_id]; + } + return; +} + +///============================================================================= +bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", + event_id); + return true; + } + + std::lock_guard lock(event_mutex_); + + return event_queues_[event_id].empty(); +} + +///============================================================================= +std::unique_ptr EventsBase::pop_next_event( + rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", + event_id); + return nullptr; + } + + std::lock_guard lock(event_mutex_); + + if (event_queues_[event_id].empty()) { + // This tells rcl that the check for a new events was done, but no events have come in yet. + return nullptr; + } + + std::unique_ptr event_status = + std::move(event_queues_[event_id].front()); + event_queues_[event_id].pop_front(); + + return event_status; +} + +///============================================================================= +void EventsBase::add_new_event( + rmw_zenoh_event_type_t event_id, + std::unique_ptr event) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_mutex_); + + std::deque> & event_queue = event_queues_[event_id]; + if (event_queue.size() >= event_queue_depth_) { + // Log warning if message is discarded due to hitting the queue depth + RCUTILS_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Event queue depth of %ld reached, discarding oldest message " + "for event type %d", + event_queue_depth_, + event_id); + + event_queue.pop_front(); + } + + event_queue.emplace_back(std::move(event)); + + // Since we added new data, trigger event callback and guard condition if they are available + trigger_event_callback(event_id); + notify_event(event_id); +} + +///============================================================================= +void EventsBase::attach_event_condition( + rmw_zenoh_event_type_t event_id, + std::condition_variable * condition_variable) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_condition_mutex_); + event_conditions_[event_id] = condition_variable; +} + +///============================================================================= +void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_condition_mutex_); + event_conditions_[event_id] = nullptr; +} + +///============================================================================= +void EventsBase::notify_event(rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_condition_mutex_); + if (event_conditions_[event_id] != nullptr) { + event_conditions_[event_id]->notify_one(); + } +} diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index 73422269..727ff1fc 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -15,9 +15,16 @@ #ifndef DETAIL__EVENT_HPP_ #define DETAIL__EVENT_HPP_ +#include +#include +#include +#include +#include #include #include "rmw/event.h" +#include "rmw/event_callback_type.h" + ///============================================================================= // A struct that represents an event status in rmw_zenoh. @@ -53,4 +60,91 @@ static const std::unordered_map event_ // TODO(clalancette): Implement remaining events }; +///============================================================================= +/// A struct to store status changes which can be mapped to rmw event statuses. +/// The data field can be used to store serialized information for more complex statuses. +struct rmw_zenoh_event_status_t +{ + size_t total_count; + size_t total_count_change; + size_t current_count; + std::string data; +}; + +///============================================================================= +/// Base class to be inherited by entities that support events. +class EventsBase +{ +public: + /// @brief Set the user defined callback that should be called when + /// a new message/response/request is received. + /// @param user_data the data that should be passed to the callback. + /// @param callback the callback to be set. + void set_user_callback(const void * user_data, rmw_event_callback_t callback); + + /// Trigger the user callback. + void trigger_user_callback(); + + /// @brief Set the callback to be triggered when the relevant event is triggered. + /// @param event_id the id of the event + /// @param callback the callback to trigger for this event. + /// @param user_data the data to be passed to the callback. + void event_set_callback( + rmw_zenoh_event_type_t event_id, + rmw_event_callback_t callback, + const void * user_data); + + /// @brief Returns true if the event queue is empty. + /// @param event_id the event id whose event queue should be checked. + bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const; + + /// Pop the next event in the queue. + /// @param event_id the event id whose queue should be popped. + std::unique_ptr pop_next_event( + rmw_zenoh_event_type_t event_id); + + /// Add an event status for an event. + /// @param event_id the event id queue to which the status should be added. + void add_new_event( + rmw_zenoh_event_type_t event_id, + std::unique_ptr event); + + /// @brief Attach the condition variable provided by rmw_wait. + /// @param condition_variable to attach. + void attach_event_condition( + rmw_zenoh_event_type_t event_id, + std::condition_variable * condition_variable); + + /// @brief Detach the condition variable provided by rmw_wait. + void detach_event_condition(rmw_zenoh_event_type_t event_id); + +private: + /// @brief Trigger the callback for an event. + /// @param event_id the event id whose callback should be triggered. + void trigger_event_callback(rmw_zenoh_event_type_t event_id); + + /// Notify once event is added to an event queue. + void notify_event(rmw_zenoh_event_type_t event_id); + + /// Mutex to lock when read/writing members. + // The mutex is recursive as add_new_event() invokes `trigger_event_callback()`. + mutable std::recursive_mutex event_mutex_; + /// Mutex to lock for event_condition. + mutable std::mutex event_condition_mutex_; + /// Condition variable to attach for event notifications. + std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; + /// User callback that can be set via set_user_callback(). + rmw_event_callback_t callback_ {nullptr}; + /// User data that should be passed to the user callback. + const void * user_data_ {nullptr}; + /// Count for + size_t unread_count_ {0}; + rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; + const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; + size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0}; + // A dequeue of events for each type of event this RMW supports. + std::deque> event_queues_[ZENOH_EVENT_ID_MAX + 1] {}; + const std::size_t event_queue_depth_ = 10; +}; + #endif // DETAIL__EVENT_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index dd959ac6..cebb27bb 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -26,212 +26,6 @@ #include "rmw_data_types.hpp" -///============================================================================= -void EventsBase::set_user_callback( - const void * user_data, rmw_event_callback_t callback) -{ - std::lock_guard lock_mutex(event_mutex_); - - if (callback) { - // Push events arrived before setting the the executor callback. - if (unread_count_) { - callback(user_data, unread_count_); - unread_count_ = 0; - } - user_data_ = user_data; - callback_ = callback; - } else { - user_data_ = nullptr; - callback_ = nullptr; - } -} - -///============================================================================= -void EventsBase::trigger_user_callback() -{ - // Trigger the user provided event callback if available. - std::lock_guard lock_mutex(event_mutex_); - if (callback_ != nullptr) { - callback_(user_data_, 1); - } else { - ++unread_count_; - } -} - -///============================================================================= -void EventsBase::event_set_callback( - rmw_zenoh_event_type_t event_id, - rmw_event_callback_t callback, - const void * user_data) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return; - } - - std::lock_guard lock(event_mutex_); - - // Set the user callback data - event_callback_[event_id] = callback; - event_data_[event_id] = user_data; - - if (callback && event_unread_count_[event_id]) { - // Push events happened before having assigned a callback - callback(user_data, event_unread_count_[event_id]); - event_unread_count_[event_id] = 0; - } - return; -} - -///============================================================================= -void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return; - } - - std::lock_guard lock(event_mutex_); - - if (event_callback_[event_id] != nullptr) { - event_callback_[event_id](event_data_[event_id], 1); - } else { - ++event_unread_count_[event_id]; - } - return; -} - -///============================================================================= -bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return true; - } - - std::lock_guard lock(event_mutex_); - - return event_queues_[event_id].empty(); -} - -///============================================================================= -std::unique_ptr EventsBase::pop_next_event( - rmw_zenoh_event_type_t event_id) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return nullptr; - } - - std::lock_guard lock(event_mutex_); - - if (event_queues_[event_id].empty()) { - // This tells rcl that the check for a new events was done, but no events have come in yet. - return nullptr; - } - - std::unique_ptr event_status = - std::move(event_queues_[event_id].front()); - event_queues_[event_id].pop_front(); - - return event_status; -} - -///============================================================================= -void EventsBase::add_new_event( - rmw_zenoh_event_type_t event_id, - std::unique_ptr event) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return; - } - - std::lock_guard lock(event_mutex_); - - std::deque> & event_queue = event_queues_[event_id]; - if (event_queue.size() >= event_queue_depth_) { - // Log warning if message is discarded due to hitting the queue depth - RCUTILS_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "Event queue depth of %ld reached, discarding oldest message " - "for event type %d", - event_queue_depth_, - event_id); - - event_queue.pop_front(); - } - - event_queue.emplace_back(std::move(event)); - - // Since we added new data, trigger event callback and guard condition if they are available - trigger_event_callback(event_id); - notify_event(event_id); -} - -///============================================================================= -void EventsBase::attach_event_condition( - rmw_zenoh_event_type_t event_id, - std::condition_variable * condition_variable) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return; - } - - std::lock_guard lock(event_condition_mutex_); - event_conditions_[event_id] = condition_variable; -} - -///============================================================================= -void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return; - } - - std::lock_guard lock(event_condition_mutex_); - event_conditions_[event_id] = nullptr; -} - -///============================================================================= -void EventsBase::notify_event(rmw_zenoh_event_type_t event_id) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return; - } - - std::lock_guard lock(event_condition_mutex_); - if (event_conditions_[event_id] != nullptr) { - event_conditions_[event_id]->notify_one(); - } -} ///============================================================================= saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 7467d4c6..d81902b9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -31,101 +31,15 @@ #include "rcutils/allocator.h" -#include "rmw/event_callback_type.h" #include "rmw/rmw.h" +#include "event.hpp" #include "graph_cache.hpp" #include "message_type_support.hpp" #include "service_type_support.hpp" /// Structs for various type erased data fields. -/// A struct to store status changes which can be mapped to rmw event statuses. -/// The data field can be used to store serialized information for more complex statuses. -struct rmw_zenoh_event_status_t -{ - size_t total_count; - size_t total_count_change; - size_t current_count; - std::string data; -}; - -///============================================================================= -/// Base class to be inherited by entities that support events. -class EventsBase -{ -public: - /// @brief Set the user defined callback that should be called when - /// a new message/response/request is received. - /// @param user_data the data that should be passed to the callback. - /// @param callback the callback to be set. - void set_user_callback(const void * user_data, rmw_event_callback_t callback); - - /// Trigger the user callback. - void trigger_user_callback(); - - /// @brief Set the callback to be triggered when the relevant event is triggered. - /// @param event_id the id of the event - /// @param callback the callback to trigger for this event. - /// @param user_data the data to be passed to the callback. - void event_set_callback( - rmw_zenoh_event_type_t event_id, - rmw_event_callback_t callback, - const void * user_data); - - /// @brief Returns true if the event queue is empty. - /// @param event_id the event id whose event queue should be checked. - bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const; - - /// Pop the next event in the queue. - /// @param event_id the event id whose queue should be popped. - std::unique_ptr pop_next_event( - rmw_zenoh_event_type_t event_id); - - /// Add an event status for an event. - /// @param event_id the event id queue to which the status should be added. - void add_new_event( - rmw_zenoh_event_type_t event_id, - std::unique_ptr event); - - /// @brief Attach the condition variable provided by rmw_wait. - /// @param condition_variable to attach. - void attach_event_condition( - rmw_zenoh_event_type_t event_id, - std::condition_variable * condition_variable); - - /// @brief Detach the condition variable provided by rmw_wait. - void detach_event_condition(rmw_zenoh_event_type_t event_id); - -private: - /// @brief Trigger the callback for an event. - /// @param event_id the event id whose callback should be triggered. - void trigger_event_callback(rmw_zenoh_event_type_t event_id); - - /// Notify once event is added to an event queue. - void notify_event(rmw_zenoh_event_type_t event_id); - - /// Mutex to lock when read/writing members. - // The mutex is recursive as add_new_event() invokes `trigger_event_callback()`. - mutable std::recursive_mutex event_mutex_; - /// Mutex to lock for event_condition. - mutable std::mutex event_condition_mutex_; - /// Condition variable to attach for event notifications. - std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; - /// User callback that can be set via set_user_callback(). - rmw_event_callback_t callback_ {nullptr}; - /// User data that should be passed to the user callback. - const void * user_data_ {nullptr}; - /// Count for - size_t unread_count_ {0}; - rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; - const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; - size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0}; - // A dequeue of events for each type of event this RMW supports. - std::deque> event_queues_[ZENOH_EVENT_ID_MAX + 1] {}; - const std::size_t event_queue_depth_ = 10; -}; - ///============================================================================= struct rmw_context_impl_s { diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index ec137cd2..d102e060 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -20,6 +20,7 @@ #include "detail/identifier.hpp" #include "detail/rmw_data_types.hpp" + extern "C" { ///==============================================================================