diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 8f604a02..bef61de1 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -27,38 +27,210 @@ #include "rmw_data_types.hpp" ///============================================================================= -saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) -: payload(p), recv_timestamp(recv_ts) +void EventsBase::set_user_callback( + const void * user_data, rmw_event_callback_t callback) { - memcpy(publisher_gid, pub_gid, 16); + std::lock_guard lock_mutex(event_mutex_); + + if (callback_) { + // Push events arrived before setting the the executor callback. + if (unread_count_) { + callback(user_data, unread_count_); + unread_count_ = 0; + } + user_data_ = user_data; + callback_ = callback; + } else { + user_data_ = nullptr; + callback_ = nullptr; + } } ///============================================================================= -void rmw_publisher_data_t::event_set_callback( +void EventsBase::trigger_user_callback() +{ + // Trigger the user provided event callback if available. + std::lock_guard lock_event_mutex(event_mutex_); + if (callback_ != nullptr) { + callback_(user_data_, 1); + } else { + ++unread_count_; + } +} + +///============================================================================= +void EventsBase::event_set_callback( rmw_zenoh_event_type_t event_id, rmw_event_callback_t callback, const void * user_data) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", event_id); + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + event_id); return; } - std::lock_guard lock(user_callback_data_.mutex); + std::lock_guard lock(event_mutex_); // Set the user callback data - user_callback_data_.event_callback[event_id] = callback; - user_callback_data_.event_data[event_id] = user_data; + event_callback_[event_id] = callback; + event_data_[event_id] = user_data; - if (callback && user_callback_data_.event_unread_count[event_id]) { + if (callback && event_unread_count_[event_id]) { // Push events happened before having assigned a callback - callback(user_data, user_callback_data_.event_unread_count[event_id]); - user_callback_data_.event_unread_count[event_id] = 0; + callback(user_data, event_unread_count_[event_id]); + event_unread_count_[event_id] = 0; + } + return; +} + +///============================================================================= +void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_mutex_); + + if (event_callback_[event_id] != nullptr) { + event_callback_[event_id](event_data_[event_id], 1); + } else { + ++event_unread_count_[event_id]; } return; } +///============================================================================= +bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + event_id); + return true; + } + + std::lock_guard lock(event_mutex_); + + return event_queues_[event_id].empty(); +} + +///============================================================================= +std::unique_ptr EventsBase::pop_next_event( + rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + event_id); + return nullptr; + } + + std::lock_guard lock(event_mutex_); + + if (event_queues_[event_id].empty()) { + // This tells rcl that the check for a new events was done, but no events have come in yet. + return nullptr; + } + + std::unique_ptr event_status = + std::move(event_queues_[event_id].front()); + event_queues_[event_id].pop_front(); + + return event_status; +} + +///============================================================================= +void EventsBase::add_new_event( + rmw_zenoh_event_type_t event_id, + std::unique_ptr event) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_mutex_); + + std::deque> & event_queue = event_queues_[event_id]; + if (event_queue.size() >= event_queue_depth_) { + // Log warning if message is discarded due to hitting the queue depth + RCUTILS_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Event queue depth of %ld reached, discarding oldest message " + "for event type %d", + event_queue_depth_, + event_id); + + event_queue.pop_front(); + } + + event_queue.emplace_back(std::move(event)); + + // Since we added new data, trigger the event guard condition if it is available. + notify_event(event_id); +} + +///============================================================================= +void EventsBase::attach_event_condition( + rmw_zenoh_event_type_t event_id, + std::condition_variable * condition_variable) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_condition_mutex_); + event_conditions_[event_id] = condition_variable; +} + +///============================================================================= +void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_condition_mutex_); + event_conditions_[event_id] = nullptr; +} + +///============================================================================= +void EventsBase::notify_event(rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + event_id); + return; + } + + std::lock_guard lock(event_condition_mutex_); + if (event_conditions_[event_id] != nullptr) { + event_conditions_[event_id]->notify_one(); + } +} + +///============================================================================= +saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) +: payload(p), recv_timestamp(recv_ts) +{ + memcpy(publisher_gid, pub_gid, 16); +} + ///============================================================================= void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable) { @@ -133,65 +305,11 @@ void rmw_subscription_data_t::add_new_message( message_queue_.emplace_back(std::move(msg)); - // Trigger the user provided event callback if available. - std::unique_lock lock_event_mutex(user_callback_data_.mutex); - if (user_callback_data_.callback != nullptr) { - user_callback_data_.callback(user_callback_data_.user_data, 1); - } else { - ++user_callback_data_.unread_count; - } - lock_event_mutex.unlock(); - - // Since we added new data, trigger the guard condition if it is available + // Since we added new data, trigger user callback and guard condition if they are available + trigger_user_callback(); notify(); } -///============================================================================= -void rmw_subscription_data_t::set_on_new_message_callback( - const void * user_data, rmw_event_callback_t callback) -{ - std::lock_guard lock_mutex(user_callback_data_.mutex); - - if (callback) { - // Push events arrived before setting the the executor callback. - if (user_callback_data_.unread_count) { - callback(user_data, user_callback_data_.unread_count); - user_callback_data_.unread_count = 0; - } - user_callback_data_.user_data = user_data; - user_callback_data_.callback = callback; - } else { - user_callback_data_.user_data = nullptr; - user_callback_data_.callback = nullptr; - } -} - -///============================================================================= -void rmw_subscription_data_t::event_set_callback( - rmw_zenoh_event_type_t event_id, - rmw_event_callback_t callback, - const void * user_data) -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", event_id); - return; - } - - std::lock_guard lock(user_callback_data_.mutex); - - // Set the user callback data - user_callback_data_.event_callback[event_id] = callback; - user_callback_data_.event_data[event_id] = user_data; - - if (callback && user_callback_data_.event_unread_count[event_id]) { - // Push events happened before having assigned a callback - callback(user_data, user_callback_data_.event_unread_count[event_id]); - user_callback_data_.event_unread_count[event_id] = 0; - } - return; -} - ///============================================================================= bool rmw_service_data_t::query_queue_is_empty() const { @@ -242,16 +360,8 @@ void rmw_service_data_t::add_new_query(std::unique_ptr query) std::lock_guard lock(query_queue_mutex_); query_queue_.emplace_back(std::move(query)); - // Trigger the user provided event callback if available. - std::unique_lock lock_event_mutex(user_callback_data_.mutex); - if (user_callback_data_.callback != nullptr) { - user_callback_data_.callback(user_callback_data_.user_data, 1); - } else { - ++user_callback_data_.unread_count; - } - lock_event_mutex.unlock(); - - // Since we added new data, trigger the guard condition if it is available + // Since we added new data, trigger user callback and guard condition if they are available + trigger_user_callback(); notify(); } @@ -284,26 +394,6 @@ std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequ return query; } -///============================================================================= -void rmw_service_data_t::set_on_new_request_callback( - const void * user_data, rmw_event_callback_t callback) -{ - std::lock_guard lock_mutex(user_callback_data_.mutex); - - if (callback) { - // Push events arrived before setting the the executor callback. - if (user_callback_data_.unread_count) { - callback(user_data, user_callback_data_.unread_count); - user_callback_data_.unread_count = 0; - } - user_callback_data_.user_data = user_data; - user_callback_data_.callback = callback; - } else { - user_callback_data_.user_data = nullptr; - user_callback_data_.callback = nullptr; - } -} - ///============================================================================= void rmw_client_data_t::notify() { @@ -319,15 +409,8 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr reply) std::lock_guard lock(reply_queue_mutex_); reply_queue_.emplace_back(std::move(reply)); - // Trigger the user provided event callback if available. - std::unique_lock lock_event_mutex(user_callback_data_.mutex); - if (user_callback_data_.callback != nullptr) { - user_callback_data_.callback(user_callback_data_.user_data, 1); - } else { - ++user_callback_data_.unread_count; - } - lock_event_mutex.unlock(); - + // Since we added new data, trigger user callback and guard condition if they are available + trigger_user_callback(); notify(); } @@ -368,26 +451,6 @@ std::unique_ptr rmw_client_data_t::pop_next_reply() return latest_reply; } -///============================================================================= -void rmw_client_data_t::set_on_new_response_callback( - const void * user_data, rmw_event_callback_t callback) -{ - std::lock_guard lock_mutex(user_callback_data_.mutex); - - if (callback) { - // Push events arrived before setting the the executor callback. - if (user_callback_data_.unread_count) { - callback(user_data, user_callback_data_.unread_count); - user_callback_data_.unread_count = 0; - } - user_callback_data_.user_data = user_data; - user_callback_data_.callback = callback; - } else { - user_callback_data_.user_data = nullptr; - user_callback_data_.callback = nullptr; - } -} - //============================================================================== void sub_data_handler( const z_sample_t * sample, diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index b6e56daf..dff46d17 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -40,16 +40,89 @@ /// Structs for various type erased data fields. +/// A struct to store status changes which can be mapped to rmw event statuses. +/// The data field can be used to store serialized information for more complex statuses. +struct rmw_zenoh_event_status_t +{ + size_t total_count; + size_t total_count_change; + size_t current_count; + std::string data; +}; + ///============================================================================= -struct user_callback_data_t +/// Base class to be inherited by entities that support events. +class EventsBase { - std::mutex mutex; - rmw_event_callback_t callback {nullptr}; - const void * user_data {nullptr}; - size_t unread_count {0}; - rmw_event_callback_t event_callback[ZENOH_EVENT_ID_MAX + 1] {nullptr}; - const void * event_data[ZENOH_EVENT_ID_MAX + 1] {nullptr}; - size_t event_unread_count[ZENOH_EVENT_ID_MAX + 1] {0}; +public: + /// @brief Set the user defined callback that should be called when + /// a new message/response/request is received. + /// @param user_data the data that should be passed to the callback. + /// @param callback the callback to be set. + void set_user_callback(const void * user_data, rmw_event_callback_t callback); + + /// Trigger the user callback. + void trigger_user_callback(); + + /// @brief Set the callback to be triggered when the relevant event is triggered. + /// @param event_id the id of the event + /// @param callback the callback to trigger for this event. + /// @param user_data the data to be passed to the callback. + void event_set_callback( + rmw_zenoh_event_type_t event_id, + rmw_event_callback_t callback, + const void * user_data); + + /// @brief Trigger the callback for an event. + /// @param event_id the event id whose callback should be triggered. + void trigger_event_callback(rmw_zenoh_event_type_t event_id); + + /// @brief Returns true if the event queue is empty. + /// @param event_id the event id whose event queue should be checked. + bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const; + + /// Pop the next event in the queue. + /// @param event_id the event id whose queue should be popped. + std::unique_ptr pop_next_event( + rmw_zenoh_event_type_t event_id); + + /// Add an event status for an event. + /// @param event_id the event id queue to which the status should be added. + void add_new_event( + rmw_zenoh_event_type_t event_id, + std::unique_ptr event); + + /// @brief Attach the condition variable provided by rmw_wait. + /// @param condition_variable to attach. + void attach_event_condition( + rmw_zenoh_event_type_t event_id, + std::condition_variable * condition_variable); + + /// @brief Detach the condition variable provided by rmw_wait. + void detach_event_condition(rmw_zenoh_event_type_t event_id); + +private: + /// Notify once event is added to an event queue. + void notify_event(rmw_zenoh_event_type_t event_id); + + /// Mutex to lock when read/writing members. + mutable std::mutex event_mutex_; + /// Mutex to lock for event_condition. + mutable std::mutex event_condition_mutex_; + /// Condition variable to attach for event notifications. + std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; + /// User callback that can be set via set_user_callback(). + rmw_event_callback_t callback_ {nullptr}; + /// User data that should be passed to the user callback. + const void * user_data_ {nullptr}; + /// Count for + size_t unread_count_ {0}; + rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; + const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; + size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0}; + // A dequeue of events for each type of event this RMW supports. + std::deque> event_queues_[ZENOH_EVENT_ID_MAX + 1] {}; + const std::size_t event_queue_depth_ = 10; }; ///============================================================================= @@ -84,7 +157,7 @@ struct rmw_node_data_t }; ///============================================================================= -class rmw_publisher_data_t +class rmw_publisher_data_t : public EventsBase { public: // An owned publisher. @@ -107,13 +180,6 @@ class rmw_publisher_data_t // Context for memory allocation for messages. rmw_context_t * context; - void event_set_callback( - rmw_zenoh_event_type_t event_id, - rmw_event_callback_t callback, - const void * user_data); - -private: - user_callback_data_t user_callback_data_; }; ///============================================================================= @@ -139,7 +205,7 @@ struct saved_msg_data }; ///============================================================================= -class rmw_subscription_data_t final +class rmw_subscription_data_t : public EventsBase { public: // An owned subscriber or querying_subscriber depending on the QoS settings. @@ -166,13 +232,6 @@ class rmw_subscription_data_t final void add_new_message(std::unique_ptr msg, const std::string & topic_name); - void set_on_new_message_callback(const void * user_data, rmw_event_callback_t callback); - - void event_set_callback( - rmw_zenoh_event_type_t event_id, - rmw_event_callback_t callback, - const void * user_data); - private: std::deque> message_queue_; mutable std::mutex message_queue_mutex_; @@ -181,8 +240,6 @@ class rmw_subscription_data_t final std::condition_variable * condition_{nullptr}; std::mutex condition_mutex_; - - user_callback_data_t user_callback_data_; }; @@ -207,7 +264,7 @@ class ZenohQuery final }; ///============================================================================= -class rmw_service_data_t final +class rmw_service_data_t : public EventsBase { public: z_owned_keyexpr_t keyexpr; @@ -242,8 +299,6 @@ class rmw_service_data_t final std::unique_ptr take_from_query_map(int64_t sequence_number); - void set_on_new_request_callback(const void * user_data, rmw_event_callback_t callback); - private: void notify(); @@ -257,8 +312,6 @@ class rmw_service_data_t final std::condition_variable * condition_{nullptr}; std::mutex condition_mutex_; - - user_callback_data_t user_callback_data_; }; ///============================================================================= @@ -276,7 +329,7 @@ class ZenohReply final }; ///============================================================================= -class rmw_client_data_t final +class rmw_client_data_t : public EventsBase { public: z_owned_keyexpr_t keyexpr; @@ -311,8 +364,6 @@ class rmw_client_data_t final std::unique_ptr pop_next_reply(); - void set_on_new_response_callback(const void * user_data, rmw_event_callback_t callback); - private: void notify(); @@ -324,8 +375,6 @@ class rmw_client_data_t final std::deque> reply_queue_; mutable std::mutex reply_queue_mutex_; - - user_callback_data_t user_callback_data_; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index bbfcc253..167bb8e2 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -102,33 +102,43 @@ rmw_event_set_callback( auto zenoh_event_it = event_map.find(rmw_event->event_type); if (zenoh_event_it == event_map.end()) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("RMW Zenoh does not support event [%d]", rmw_event->event_type); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh does not support event [%d]", + rmw_event->event_type); return RMW_RET_ERROR; } - switch (zenoh_event_it->second) { - case ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: { - rmw_subscription_data_t * sub_data = static_cast(rmw_event->data); - RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - sub_data->event_set_callback( - zenoh_event_it->second, - callback, - user_data); - break; - } - case ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: { - rmw_publisher_data_t * pub_data = static_cast(rmw_event->data); - RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - pub_data->event_set_callback( - zenoh_event_it->second, - callback, - user_data); - break; - } - default: { - return RMW_RET_INVALID_ARGUMENT; - } - } + // Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase. + EventsBase * event_data = static_cast(rmw_event->data); + RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT); + event_data->event_set_callback( + zenoh_event_it->second, + callback, + user_data); + + // switch (zenoh_event_it->second) { + // case ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: { + // rmw_subscription_data_t * sub_data = static_cast(rmw_event->data); + // RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + // sub_data->event_set_callback( + // zenoh_event_it->second, + // callback, + // user_data); + // break; + // } + // case ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: { + // rmw_publisher_data_t * pub_data = static_cast(rmw_event->data); + // RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + // pub_data->event_set_callback( + // zenoh_event_it->second, + // callback, + // user_data); + // break; + // } + // default: { + // return RMW_RET_INVALID_ARGUMENT; + // } + // } return RMW_RET_OK; } @@ -154,31 +164,45 @@ rmw_take_event( auto zenoh_event_it = event_map.find(event_handle->event_type); if (zenoh_event_it == event_map.end()) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("RMW Zenoh does not support event [%d]", event_handle->event_type); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh does not support event [%d]", + event_handle->event_type); + return RMW_RET_ERROR; + } + + // Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase. + EventsBase * event_data = static_cast(event_handle->data); + RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT); + std::unique_ptr st = event_data->pop_next_event( + zenoh_event_it->second); + if (st == nullptr) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "rmw_take_event called when event queue for event type [%d] is empty", + event_handle->event_type); return RMW_RET_ERROR; } + // Now depending on the event, populate the rwm event status. switch (zenoh_event_it->second) { case ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: { - rmw_subscription_data_t * sub_data = static_cast(event_handle->data); - RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - auto ei = static_cast(event_info); - ei->total_count = 0; - ei->total_count_change = 0; - *taken = true; - return RMW_RET_OK; - } + auto ei = static_cast(event_info); + RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); + ei->total_count = st->total_count; + ei->total_count_change = st->total_count_change; + *taken = true; + return RMW_RET_OK; + } case ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: { - rmw_publisher_data_t * pub_data = static_cast(event_handle->data); - auto ei = static_cast(event_info); - ei->total_count = 0; - ei->total_count_change = 0; - *taken = true; - return RMW_RET_OK; - } + auto ei = static_cast(event_info); + RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); + ei->total_count = st->total_count; + ei->total_count_change = st->total_count_change; + *taken = true; + return RMW_RET_OK; + } default: { - return RMW_RET_INVALID_ARGUMENT; - } + return RMW_RET_INVALID_ARGUMENT; + } } return RMW_RET_ERROR; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 044e3b40..9dedd9bd 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -3184,18 +3184,25 @@ static bool has_triggered_condition( } } - // TODO(clalancette): Deal with events - // if (events) { - // for (size_t i = 0; i < events->event_count; ++i) { - // auto event = static_cast(events->events[i]); - // auto custom_event_info = static_cast(event->data); - // if (custom_event_info->get_listener()->get_statuscondition().get_trigger_value() || - // custom_event_info->get_listener()->get_event_guard(event->event_type).get_trigger_value()) - // { - // return true; - // } - // } - // } + if (events) { + for (size_t i = 0; i < events->event_count; ++i) { + auto event = static_cast(events->events[i]); + const rmw_event_type_t & event_type = event->event_type; + // Check if the event queue for this event type is empty. + auto zenoh_event_it = event_map.find(event_type); + if (zenoh_event_it != event_map.end()) { + auto event_data = static_cast(event->data); + if (event_data != nullptr) { + if (!event_data->event_queue_is_empty(zenoh_event_it->second)) { + printf("EVENTS QUEUE IS NOT EMPTY!!\n"); + return true; + } + } + } else { + printf("ERROR!!!!!!!!!!!!!!\n"); + } + } + } if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { @@ -3325,16 +3332,20 @@ rmw_wait( } } - // if (events) { - // for (size_t i = 0; i < events->event_count; ++i) { - // auto event = static_cast(events->events[i]); - // auto custom_event_info = static_cast(event->data); - // attached_conditions.push_back( - // &custom_event_info->get_listener()->get_statuscondition()); - // attached_conditions.push_back( - // &custom_event_info->get_listener()->get_event_guard(event->event_type)); - // } - // } + if (events) { + for (size_t i = 0; i < events->event_count; ++i) { + auto event = static_cast(events->events[i]); + auto event_data = static_cast(event->data); + if (event_data != nullptr) { + auto zenoh_event_it = event_map.find(event->event_type); + if (zenoh_event_it != event_map.end()) { + event_data->attach_event_condition( + zenoh_event_it->second, + &wait_set_data->condition_variable); + } + } + } + } std::unique_lock lock(wait_set_data->condition_mutex); @@ -3367,6 +3378,26 @@ rmw_wait( } } + if (events) { + // Now detach the condition variable and mutex from each of the subscriptions + for (size_t i = 0; i < events->event_count; ++i) { + auto event = static_cast(events->events[i]); + auto event_data = static_cast(event->data); + if (event_data != nullptr) { + auto zenoh_event_it = event_map.find(event->event_type); + if (zenoh_event_it != event_map.end()) { + event_data->detach_event_condition(zenoh_event_it->second); + // According to the documentation for rmw_wait in rmw.h, entries in the + // array that have *not* been triggered should be set to NULL + if (event_data->event_queue_is_empty(zenoh_event_it->second)) { + // Setting to nullptr lets rcl know that this subscription is not ready + events->events[i] = nullptr; + } + } + } + } + } + if (subscriptions) { // Now detach the condition variable and mutex from each of the subscriptions for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { @@ -3676,7 +3707,7 @@ rmw_subscription_set_on_new_message_callback( rmw_subscription_data_t * sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - sub_data->set_on_new_message_callback( + sub_data->set_user_callback( user_data, callback); return RMW_RET_OK; } @@ -3693,7 +3724,7 @@ rmw_service_set_on_new_request_callback( rmw_service_data_t * service_data = static_cast(service->data); RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); - service_data->set_on_new_request_callback( + service_data->set_user_callback( user_data, callback); return RMW_RET_OK; } @@ -3710,7 +3741,7 @@ rmw_client_set_on_new_response_callback( rmw_client_data_t * client_data = static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - client_data->set_on_new_response_callback( + client_data->set_user_callback( user_data, callback); return RMW_RET_OK; }