From d1c9af34896846d2ea6571af18d9553797d32559 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 22 Jan 2024 14:58:01 -0500 Subject: [PATCH 1/4] Move more implementation into rmw_subscription_data_t. This moves more of the implementation out of rmw_zenoh.cpp and into the class handling the subscription data. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 98 +++++++++++++++------ rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 26 ++++-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 26 ++---- 3 files changed, 98 insertions(+), 52 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 6e80a97c..5404c78b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -31,6 +31,72 @@ saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uin memcpy(publisher_gid, pub_gid, 16); } +void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable) +{ + std::lock_guard lock(condition_mutex_); + condition_ = condition_variable; +} + +void rmw_subscription_data_t::notify() +{ + std::lock_guard lock(condition_mutex_); + if (condition_ != nullptr) { + condition_->notify_one(); + } +} + +void rmw_subscription_data_t::detach_condition() +{ + std::lock_guard lock(condition_mutex_); + condition_ = nullptr; +} + +bool rmw_subscription_data_t::message_queue_is_empty() const +{ + std::lock_guard lock(message_queue_mutex_); + return message_queue_.empty(); +} + +std::unique_ptr rmw_subscription_data_t::pop_next_message() +{ + std::lock_guard lock(message_queue_mutex_); + + if (message_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return nullptr; + } + + std::unique_ptr msg_data = std::move(message_queue_.front()); + message_queue_.pop_front(); + + return msg_data; +} + +void rmw_subscription_data_t::add_new_message( + std::unique_ptr msg, const std::string & topic_name) +{ + std::lock_guard lock(message_queue_mutex_); + + if (message_queue_.size() >= queue_depth) { + // Log warning if message is discarded due to hitting the queue depth + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Message queue depth of %ld reached, discarding oldest message " + "for subscription for %s", + queue_depth, + topic_name.c_str()); + + std::unique_ptr old = std::move(message_queue_.front()); + z_drop(z_move(old->payload)); + message_queue_.pop_front(); + } + + message_queue_.emplace_back(std::move(msg)); + + // Since we added new data, trigger the guard condition if it is available + notify(); +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -53,34 +119,10 @@ void sub_data_handler( return; } - { - std::lock_guard lock(sub_data->message_queue_mutex); - - if (sub_data->message_queue.size() >= sub_data->queue_depth) { - // Log warning if message is discarded due to hitting the queue depth - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Message queue depth of %ld reached, discarding oldest message " - "for subscription for %s", - sub_data->queue_depth, - z_loan(keystr)); - - std::unique_ptr old = std::move(sub_data->message_queue.front()); - z_drop(&old->payload); - sub_data->message_queue.pop_front(); - } - - sub_data->message_queue.emplace_back( - std::make_unique( - zc_sample_payload_rcinc(sample), - sample->timestamp.time, sample->timestamp.id.id)); - - // Since we added new data, trigger the guard condition if it is available - std::lock_guard internal_lock(sub_data->internal_mutex); - if (sub_data->condition != nullptr) { - sub_data->condition->notify_one(); - } - } + sub_data->add_new_message( + std::make_unique( + zc_sample_payload_rcinc(sample), + sample->timestamp.time, sample->timestamp.id.id), z_loan(keystr)); } ZenohQuery::ZenohQuery(const z_query_t * query) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index aa18cf06..2439f864 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -109,8 +109,9 @@ struct saved_msg_data }; ///============================================================================== -struct rmw_subscription_data_t +class rmw_subscription_data_t final { +public: z_owned_subscriber_t sub; // Liveliness token for the subscription. @@ -121,14 +122,27 @@ struct rmw_subscription_data_t MessageTypeSupport * type_support; rmw_context_t * context; - std::deque> message_queue; - std::mutex message_queue_mutex; - size_t queue_depth; bool reliable; - std::mutex internal_mutex; - std::condition_variable * condition{nullptr}; + void attach_condition(std::condition_variable * condition_variable); + + void detach_condition(); + + bool message_queue_is_empty() const; + + std::unique_ptr pop_next_message(); + + void add_new_message(std::unique_ptr msg, const std::string & topic_name); + +private: + std::deque> message_queue_; + mutable std::mutex message_queue_mutex_; + + void notify(); + + std::condition_variable * condition_{nullptr}; + std::mutex condition_mutex_; }; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 0e1d53c4..aedf3e21 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1501,17 +1501,10 @@ static rmw_ret_t __rmw_take( // RETRIEVE SERIALIZED MESSAGE =============================================== - std::unique_ptr msg_data; - { - std::lock_guard lock(sub_data->message_queue_mutex); - - if (sub_data->message_queue.empty()) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - msg_data = std::move(sub_data->message_queue.front()); - sub_data->message_queue.pop_front(); + std::unique_ptr msg_data = sub_data->pop_next_message(); + if (msg_data == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; } // Object that manages the raw buffer @@ -3061,8 +3054,7 @@ static bool has_triggered_condition( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - std::lock_guard internal_lock(sub_data->internal_mutex); - if (!sub_data->message_queue.empty()) { + if (!sub_data->message_queue_is_empty()) { return true; } } @@ -3161,8 +3153,7 @@ rmw_wait( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - std::lock_guard internal_lock(sub_data->internal_mutex); - sub_data->condition = &wait_set_data->condition_variable; + sub_data->attach_condition(&wait_set_data->condition_variable); } } } @@ -3227,11 +3218,10 @@ rmw_wait( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - std::lock_guard internal_lock(sub_data->internal_mutex); - sub_data->condition = nullptr; + sub_data->detach_condition(); // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (sub_data->message_queue.empty()) { + if (sub_data->message_queue_is_empty()) { // Setting to nullptr lets rcl know that this subscription is not ready subscriptions->subscribers[i] = nullptr; } From 976cf804c30a72740ba8f20e4dd1eadf6ab854e1 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 23 Jan 2024 08:59:26 -0500 Subject: [PATCH 2/4] Move more of the implementation into rmw_service_data_t. This just allows us to hide more of the implementation and remove it from rmw_zenoh.cpp. It also fixes some of the locking, which wasn't correct previously. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 88 ++++++++++++++++++--- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 32 ++++++-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 50 +++++------- 3 files changed, 120 insertions(+), 50 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 5404c78b..a4ef87a4 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -97,6 +97,81 @@ void rmw_subscription_data_t::add_new_message( notify(); } +bool rmw_service_data_t::query_queue_is_empty() const +{ + std::lock_guard lock(query_queue_mutex_); + return query_queue_.empty(); +} + +void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable) +{ + std::lock_guard lock(condition_mutex_); + condition_ = condition_variable; +} + +void rmw_service_data_t::detach_condition() +{ + std::lock_guard lock(condition_mutex_); + condition_ = nullptr; +} + +std::unique_ptr rmw_service_data_t::pop_next_query() +{ + std::lock_guard lock(query_queue_mutex_); + if (query_queue_.empty()) { + return nullptr; + } + + std::unique_ptr query = std::move(query_queue_.front()); + query_queue_.pop_front(); + + return query; +} + +void rmw_service_data_t::notify() +{ + std::lock_guard lock(condition_mutex_); + if (condition_ != nullptr) { + condition_->notify_one(); + } +} + +void rmw_service_data_t::add_new_query(std::unique_ptr query) +{ + std::lock_guard lock(query_queue_mutex_); + query_queue_.emplace_back(std::move(query)); + + // Since we added new data, trigger the guard condition if it is available + notify(); +} + +bool rmw_service_data_t::add_to_query_map( + int64_t sequence_number, std::unique_ptr query) +{ + std::lock_guard lock(sequence_to_query_map_mutex_); + if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) { + return false; + } + sequence_to_query_map_.emplace( + std::pair(sequence_number, std::move(query))); + + return true; +} + +std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequence_number) +{ + std::lock_guard lock(sequence_to_query_map_mutex_); + auto query_it = sequence_to_query_map_.find(sequence_number); + if (query_it == sequence_to_query_map_.end()) { + return nullptr; + } + + std::unique_ptr query = std::move(query_it->second); + sequence_to_query_map_.erase(query_it); + + return query; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -160,18 +235,7 @@ void service_data_handler(const z_query_t * query, void * data) return; } - // Get the query parameters and payload - { - std::lock_guard lock(service_data->query_queue_mutex); - service_data->query_queue.emplace_back(std::make_unique(query)); - } - { - // Since we added new data, trigger the guard condition if it is available - std::lock_guard internal_lock(service_data->internal_mutex); - if (service_data->condition != nullptr) { - service_data->condition->notify_one(); - } - } + service_data->add_new_query(std::make_unique(query)); } ZenohReply::ZenohReply(const z_owned_reply_t * reply) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 2439f864..965800e2 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -167,8 +167,9 @@ class ZenohQuery final z_owned_query_t query_; }; -struct rmw_service_data_t +class rmw_service_data_t final { +public: z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; @@ -183,16 +184,33 @@ struct rmw_service_data_t rmw_context_t * context; + bool query_queue_is_empty() const; + + void attach_condition(std::condition_variable * condition_variable); + + void detach_condition(); + + std::unique_ptr pop_next_query(); + + void add_new_query(std::unique_ptr query); + + bool add_to_query_map(int64_t sequence_number, std::unique_ptr query); + + std::unique_ptr take_from_query_map(int64_t sequence_number); + +private: + void notify(); + // Deque to store the queries in the order they arrive. - std::deque> query_queue; - std::mutex query_queue_mutex; + std::deque> query_queue_; + mutable std::mutex query_queue_mutex_; // Map to store the sequence_number -> query_id - std::unordered_map> sequence_to_query_map; - std::mutex sequence_to_query_map_mutex; + std::unordered_map> sequence_to_query_map_; + std::mutex sequence_to_query_map_mutex_; - std::mutex internal_mutex; - std::condition_variable * condition{nullptr}; + std::condition_variable * condition_{nullptr}; + std::mutex condition_mutex_; }; ///============================================================================== diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index aedf3e21..f79c0ced 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2618,8 +2618,6 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); - service_data->sequence_to_query_map.clear(); - service_data->query_queue.clear(); z_drop(z_move(service_data->token)); RMW_TRY_DESTRUCTOR( @@ -2668,14 +2666,10 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - std::unique_ptr query = nullptr; - { - std::lock_guard lock(service_data->query_queue_mutex); - if (service_data->query_queue.empty()) { - return RMW_RET_OK; - } - query = std::move(service_data->query_queue.front()); - service_data->query_queue.pop_front(); + std::unique_ptr query = service_data->pop_next_query(); + if (query == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; } const z_query_t loaned_query = query->get_query(); @@ -2730,16 +2724,11 @@ rmw_take_request( request_header->received_timestamp = now_ns.count(); // Add this query to the map, so that rmw_send_response can quickly look it up later + if (!service_data->add_to_query_map( + request_header->request_id.sequence_number, std::move(query))) { - std::lock_guard lock(service_data->sequence_to_query_map_mutex); - if (service_data->sequence_to_query_map.find(request_header->request_id.sequence_number) != - service_data->sequence_to_query_map.end()) - { - RMW_SET_ERROR_MSG("duplicate sequence number in the map"); - return RMW_RET_ERROR; - } - service_data->sequence_to_query_map.emplace( - std::pair(request_header->request_id.sequence_number, std::move(query))); + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; } *taken = true; @@ -2811,13 +2800,14 @@ rmw_send_response( size_t data_length = ser.getSerializedDataLength(); // Create the queryable payload - std::lock_guard lock(service_data->sequence_to_query_map_mutex); - auto query_it = service_data->sequence_to_query_map.find(request_header->sequence_number); - if (query_it == service_data->sequence_to_query_map.end()) { + std::unique_ptr query = + service_data->take_from_query_map(request_header->sequence_number); + if (query == nullptr) { RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); return RMW_RET_ERROR; } - const z_query_t loaned_query = query_it->second->get_query(); + + const z_query_t loaned_query = query->get_query(); z_query_reply_options_t options = z_query_reply_options_default(); z_owned_bytes_map_t map = create_map_and_set_sequence_num( @@ -2838,7 +2828,6 @@ rmw_send_response( &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( response_bytes), data_length, &options); - service_data->sequence_to_query_map.erase(query_it); return RMW_RET_OK; } @@ -3065,8 +3054,7 @@ static bool has_triggered_condition( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - std::lock_guard internal_lock(serv_data->internal_mutex); - if (!serv_data->query_queue.empty()) { + if (!serv_data->query_queue_is_empty()) { return true; } } @@ -3164,8 +3152,7 @@ rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - std::lock_guard internal_lock(serv_data->internal_mutex); - serv_data->condition = &wait_set_data->condition_variable; + serv_data->attach_condition(&wait_set_data->condition_variable); } } } @@ -3234,9 +3221,10 @@ rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - std::lock_guard internal_lock(serv_data->internal_mutex); - serv_data->condition = nullptr; - if (serv_data->query_queue.empty()) { + serv_data->detach_condition(); + // According to the documentation for rmw_wait in rmw.h, entries in the + // array that have *not* been triggered should be set to NULL + if (serv_data->query_queue_is_empty()) { // Setting to nullptr lets rcl know that this service is not ready services->services[i] = nullptr; } From 2d1e333dc7acce0a028bfb6c026df484a5ce4dad Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 23 Jan 2024 09:16:10 -0500 Subject: [PATCH 3/4] Move more of the implementation into rmw_client_data_t. This allows us to encapsulate more of the client data in the right place, and also allows us to fix the locking. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 65 +++++++++++++++++---- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 31 +++++++--- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 26 +++------ 3 files changed, 85 insertions(+), 37 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index a4ef87a4..0ee14199 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -172,6 +172,55 @@ std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequ return query; } +void rmw_client_data_t::notify() +{ + std::lock_guard lock(condition_mutex_); + if (condition_ != nullptr) { + condition_->notify_one(); + } +} + +void rmw_client_data_t::add_new_reply(std::unique_ptr reply) +{ + std::lock_guard lock(reply_queue_mutex_); + reply_queue_.emplace_back(std::move(reply)); + + notify(); +} + +bool rmw_client_data_t::reply_queue_is_empty() const +{ + std::lock_guard lock(reply_queue_mutex_); + + return reply_queue_.empty(); +} + +void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable) +{ + std::lock_guard lock(condition_mutex_); + condition_ = condition_variable; +} + +void rmw_client_data_t::detach_condition() +{ + std::lock_guard lock(condition_mutex_); + condition_ = nullptr; +} + +std::unique_ptr rmw_client_data_t::pop_next_reply() +{ + std::lock_guard lock(reply_queue_mutex_); + + if (reply_queue_.empty()) { + return nullptr; + } + + std::unique_ptr latest_reply = std::move(reply_queue_.front()); + reply_queue_.pop_front(); + + return latest_reply; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -288,16 +337,8 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } - { - std::lock_guard msg_lock(client_data->replies_mutex); - // Take ownership of the reply. - client_data->replies.emplace_back(std::make_unique(reply)); - *reply = z_reply_null(); - } - { - std::lock_guard internal_lock(client_data->internal_mutex); - if (client_data->condition != nullptr) { - client_data->condition->notify_one(); - } - } + + client_data->add_new_reply(std::make_unique(reply)); + // Since we took ownership of the reply, null it out here + *reply = z_reply_null(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 965800e2..133dd876 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -228,17 +228,15 @@ class ZenohReply final z_owned_reply_t reply_; }; -struct rmw_client_data_t +class rmw_client_data_t final { +public: z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; // Liveliness token for the client. zc_owned_liveliness_token_t token; - std::mutex replies_mutex; - std::deque> replies; - const void * request_type_support_impl; const void * response_type_support_impl; const char * typesupport_identifier; @@ -247,14 +245,31 @@ struct rmw_client_data_t rmw_context_t * context; - std::mutex internal_mutex; - std::condition_variable * condition{nullptr}; - uint8_t client_guid[RMW_GID_STORAGE_SIZE]; size_t get_next_sequence_number(); - std::mutex sequence_number_mutex; + + void add_new_reply(std::unique_ptr reply); + + bool reply_queue_is_empty() const; + + void attach_condition(std::condition_variable * condition_variable); + + void detach_condition(); + + std::unique_ptr pop_next_reply(); + +private: + void notify(); + size_t sequence_number{1}; + std::mutex sequence_number_mutex; + + std::condition_variable * condition_{nullptr}; + std::mutex condition_mutex_; + + std::deque> reply_queue_; + mutable std::mutex reply_queue_mutex_; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index f79c0ced..304d1ae2 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1962,7 +1962,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); - client_data->replies.clear(); z_drop(z_move(client_data->token)); RMW_TRY_DESTRUCTOR( @@ -2234,16 +2233,12 @@ rmw_take_response( RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = nullptr; - { - std::lock_guard lock(client_data->replies_mutex); - if (client_data->replies.empty()) { - RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); - return RMW_RET_ERROR; - } - latest_reply = std::move(client_data->replies.front()); - client_data->replies.pop_front(); + std::unique_ptr latest_reply = client_data->pop_next_reply(); + if (latest_reply == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_ERROR; } + std::optional sample = latest_reply->get_sample(); if (!sample) { RMW_SET_ERROR_MSG("invalid reply sample"); @@ -3065,8 +3060,7 @@ static bool has_triggered_condition( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - if (!client_data->replies.empty()) { + if (!client_data->reply_queue_is_empty()) { return true; } } @@ -3163,8 +3157,7 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - client_data->condition = &wait_set_data->condition_variable; + client_data->attach_condition(&wait_set_data->condition_variable); } } } @@ -3237,11 +3230,10 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - client_data->condition = nullptr; + client_data->detach_condition(); // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (client_data->replies.empty()) { + if (client_data->reply_queue_is_empty()) { // Setting to nullptr lets rcl know that this client is not ready clients->clients[i] = nullptr; } From 9e1cd9a543804d7a9cf9bc37dae3b8e3f5f48b15 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 23 Jan 2024 09:36:40 -0500 Subject: [PATCH 4/4] Make sure to reset the trigger. That is, once we have discovered what the guard condition status is, we should immediately reset the trigger condition and wait for the next one. Otherwise we risk triggering over and over again. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/guard_condition.cpp | 13 ++++++------- rmw_zenoh_cpp/src/detail/guard_condition.hpp | 5 +---- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 8 +++++--- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.cpp b/rmw_zenoh_cpp/src/detail/guard_condition.cpp index 91509570..b850095f 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.cpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.cpp @@ -52,15 +52,14 @@ void GuardCondition::detach_condition() } ///============================================================================== -bool GuardCondition::has_triggered() const +bool GuardCondition::get_and_reset_trigger() { std::lock_guard lock(internal_mutex_); - return has_triggered_; -} + bool ret = has_triggered_; -///============================================================================== -void GuardCondition::reset_trigger() -{ - std::lock_guard lock(internal_mutex_); + // There is no data associated with the guard condition, so as soon as the callers asks about the + // state, we can immediately reset and get ready for the next trigger. has_triggered_ = false; + + return ret; } diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.hpp b/rmw_zenoh_cpp/src/detail/guard_condition.hpp index bbd81b7f..b556c5f7 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.hpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.hpp @@ -33,10 +33,7 @@ class GuardCondition final void detach_condition(); - bool has_triggered() const; - - // Resets has_triggered_ to false. - void reset_trigger(); + bool get_and_reset_trigger(); private: mutable std::mutex internal_mutex_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 304d1ae2..c510db3e 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -3026,8 +3026,10 @@ static bool has_triggered_condition( if (guard_conditions) { for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); - if (gc != nullptr && gc->has_triggered()) { - return true; + if (gc != nullptr) { + if (gc->get_and_reset_trigger()) { + return true; + } } } } @@ -3186,7 +3188,7 @@ rmw_wait( gc->detach_condition(); // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (!gc->has_triggered()) { + if (!gc->get_and_reset_trigger()) { guard_conditions->guard_conditions[i] = nullptr; } }