diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.cpp b/rmw_zenoh_cpp/src/detail/guard_condition.cpp index 91509570..b850095f 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.cpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.cpp @@ -52,15 +52,14 @@ void GuardCondition::detach_condition() } ///============================================================================== -bool GuardCondition::has_triggered() const +bool GuardCondition::get_and_reset_trigger() { std::lock_guard lock(internal_mutex_); - return has_triggered_; -} + bool ret = has_triggered_; -///============================================================================== -void GuardCondition::reset_trigger() -{ - std::lock_guard lock(internal_mutex_); + // There is no data associated with the guard condition, so as soon as the callers asks about the + // state, we can immediately reset and get ready for the next trigger. has_triggered_ = false; + + return ret; } diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.hpp b/rmw_zenoh_cpp/src/detail/guard_condition.hpp index bbd81b7f..b556c5f7 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.hpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.hpp @@ -33,10 +33,7 @@ class GuardCondition final void detach_condition(); - bool has_triggered() const; - - // Resets has_triggered_ to false. - void reset_trigger(); + bool get_and_reset_trigger(); private: mutable std::mutex internal_mutex_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 6e80a97c..0ee14199 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -31,6 +31,196 @@ saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uin memcpy(publisher_gid, pub_gid, 16); } +void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable) +{ + std::lock_guard lock(condition_mutex_); + condition_ = condition_variable; +} + +void rmw_subscription_data_t::notify() +{ + std::lock_guard lock(condition_mutex_); + if (condition_ != nullptr) { + condition_->notify_one(); + } +} + +void rmw_subscription_data_t::detach_condition() +{ + std::lock_guard lock(condition_mutex_); + condition_ = nullptr; +} + +bool rmw_subscription_data_t::message_queue_is_empty() const +{ + std::lock_guard lock(message_queue_mutex_); + return message_queue_.empty(); +} + +std::unique_ptr rmw_subscription_data_t::pop_next_message() +{ + std::lock_guard lock(message_queue_mutex_); + + if (message_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return nullptr; + } + + std::unique_ptr msg_data = std::move(message_queue_.front()); + message_queue_.pop_front(); + + return msg_data; +} + +void rmw_subscription_data_t::add_new_message( + std::unique_ptr msg, const std::string & topic_name) +{ + std::lock_guard lock(message_queue_mutex_); + + if (message_queue_.size() >= queue_depth) { + // Log warning if message is discarded due to hitting the queue depth + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Message queue depth of %ld reached, discarding oldest message " + "for subscription for %s", + queue_depth, + topic_name.c_str()); + + std::unique_ptr old = std::move(message_queue_.front()); + z_drop(z_move(old->payload)); + message_queue_.pop_front(); + } + + message_queue_.emplace_back(std::move(msg)); + + // Since we added new data, trigger the guard condition if it is available + notify(); +} + +bool rmw_service_data_t::query_queue_is_empty() const +{ + std::lock_guard lock(query_queue_mutex_); + return query_queue_.empty(); +} + +void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable) +{ + std::lock_guard lock(condition_mutex_); + condition_ = condition_variable; +} + +void rmw_service_data_t::detach_condition() +{ + std::lock_guard lock(condition_mutex_); + condition_ = nullptr; +} + +std::unique_ptr rmw_service_data_t::pop_next_query() +{ + std::lock_guard lock(query_queue_mutex_); + if (query_queue_.empty()) { + return nullptr; + } + + std::unique_ptr query = std::move(query_queue_.front()); + query_queue_.pop_front(); + + return query; +} + +void rmw_service_data_t::notify() +{ + std::lock_guard lock(condition_mutex_); + if (condition_ != nullptr) { + condition_->notify_one(); + } +} + +void rmw_service_data_t::add_new_query(std::unique_ptr query) +{ + std::lock_guard lock(query_queue_mutex_); + query_queue_.emplace_back(std::move(query)); + + // Since we added new data, trigger the guard condition if it is available + notify(); +} + +bool rmw_service_data_t::add_to_query_map( + int64_t sequence_number, std::unique_ptr query) +{ + std::lock_guard lock(sequence_to_query_map_mutex_); + if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) { + return false; + } + sequence_to_query_map_.emplace( + std::pair(sequence_number, std::move(query))); + + return true; +} + +std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequence_number) +{ + std::lock_guard lock(sequence_to_query_map_mutex_); + auto query_it = sequence_to_query_map_.find(sequence_number); + if (query_it == sequence_to_query_map_.end()) { + return nullptr; + } + + std::unique_ptr query = std::move(query_it->second); + sequence_to_query_map_.erase(query_it); + + return query; +} + +void rmw_client_data_t::notify() +{ + std::lock_guard lock(condition_mutex_); + if (condition_ != nullptr) { + condition_->notify_one(); + } +} + +void rmw_client_data_t::add_new_reply(std::unique_ptr reply) +{ + std::lock_guard lock(reply_queue_mutex_); + reply_queue_.emplace_back(std::move(reply)); + + notify(); +} + +bool rmw_client_data_t::reply_queue_is_empty() const +{ + std::lock_guard lock(reply_queue_mutex_); + + return reply_queue_.empty(); +} + +void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable) +{ + std::lock_guard lock(condition_mutex_); + condition_ = condition_variable; +} + +void rmw_client_data_t::detach_condition() +{ + std::lock_guard lock(condition_mutex_); + condition_ = nullptr; +} + +std::unique_ptr rmw_client_data_t::pop_next_reply() +{ + std::lock_guard lock(reply_queue_mutex_); + + if (reply_queue_.empty()) { + return nullptr; + } + + std::unique_ptr latest_reply = std::move(reply_queue_.front()); + reply_queue_.pop_front(); + + return latest_reply; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -53,34 +243,10 @@ void sub_data_handler( return; } - { - std::lock_guard lock(sub_data->message_queue_mutex); - - if (sub_data->message_queue.size() >= sub_data->queue_depth) { - // Log warning if message is discarded due to hitting the queue depth - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Message queue depth of %ld reached, discarding oldest message " - "for subscription for %s", - sub_data->queue_depth, - z_loan(keystr)); - - std::unique_ptr old = std::move(sub_data->message_queue.front()); - z_drop(&old->payload); - sub_data->message_queue.pop_front(); - } - - sub_data->message_queue.emplace_back( - std::make_unique( - zc_sample_payload_rcinc(sample), - sample->timestamp.time, sample->timestamp.id.id)); - - // Since we added new data, trigger the guard condition if it is available - std::lock_guard internal_lock(sub_data->internal_mutex); - if (sub_data->condition != nullptr) { - sub_data->condition->notify_one(); - } - } + sub_data->add_new_message( + std::make_unique( + zc_sample_payload_rcinc(sample), + sample->timestamp.time, sample->timestamp.id.id), z_loan(keystr)); } ZenohQuery::ZenohQuery(const z_query_t * query) @@ -118,18 +284,7 @@ void service_data_handler(const z_query_t * query, void * data) return; } - // Get the query parameters and payload - { - std::lock_guard lock(service_data->query_queue_mutex); - service_data->query_queue.emplace_back(std::make_unique(query)); - } - { - // Since we added new data, trigger the guard condition if it is available - std::lock_guard internal_lock(service_data->internal_mutex); - if (service_data->condition != nullptr) { - service_data->condition->notify_one(); - } - } + service_data->add_new_query(std::make_unique(query)); } ZenohReply::ZenohReply(const z_owned_reply_t * reply) @@ -182,16 +337,8 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } - { - std::lock_guard msg_lock(client_data->replies_mutex); - // Take ownership of the reply. - client_data->replies.emplace_back(std::make_unique(reply)); - *reply = z_reply_null(); - } - { - std::lock_guard internal_lock(client_data->internal_mutex); - if (client_data->condition != nullptr) { - client_data->condition->notify_one(); - } - } + + client_data->add_new_reply(std::make_unique(reply)); + // Since we took ownership of the reply, null it out here + *reply = z_reply_null(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index aa18cf06..133dd876 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -109,8 +109,9 @@ struct saved_msg_data }; ///============================================================================== -struct rmw_subscription_data_t +class rmw_subscription_data_t final { +public: z_owned_subscriber_t sub; // Liveliness token for the subscription. @@ -121,14 +122,27 @@ struct rmw_subscription_data_t MessageTypeSupport * type_support; rmw_context_t * context; - std::deque> message_queue; - std::mutex message_queue_mutex; - size_t queue_depth; bool reliable; - std::mutex internal_mutex; - std::condition_variable * condition{nullptr}; + void attach_condition(std::condition_variable * condition_variable); + + void detach_condition(); + + bool message_queue_is_empty() const; + + std::unique_ptr pop_next_message(); + + void add_new_message(std::unique_ptr msg, const std::string & topic_name); + +private: + std::deque> message_queue_; + mutable std::mutex message_queue_mutex_; + + void notify(); + + std::condition_variable * condition_{nullptr}; + std::mutex condition_mutex_; }; @@ -153,8 +167,9 @@ class ZenohQuery final z_owned_query_t query_; }; -struct rmw_service_data_t +class rmw_service_data_t final { +public: z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; @@ -169,16 +184,33 @@ struct rmw_service_data_t rmw_context_t * context; + bool query_queue_is_empty() const; + + void attach_condition(std::condition_variable * condition_variable); + + void detach_condition(); + + std::unique_ptr pop_next_query(); + + void add_new_query(std::unique_ptr query); + + bool add_to_query_map(int64_t sequence_number, std::unique_ptr query); + + std::unique_ptr take_from_query_map(int64_t sequence_number); + +private: + void notify(); + // Deque to store the queries in the order they arrive. - std::deque> query_queue; - std::mutex query_queue_mutex; + std::deque> query_queue_; + mutable std::mutex query_queue_mutex_; // Map to store the sequence_number -> query_id - std::unordered_map> sequence_to_query_map; - std::mutex sequence_to_query_map_mutex; + std::unordered_map> sequence_to_query_map_; + std::mutex sequence_to_query_map_mutex_; - std::mutex internal_mutex; - std::condition_variable * condition{nullptr}; + std::condition_variable * condition_{nullptr}; + std::mutex condition_mutex_; }; ///============================================================================== @@ -196,17 +228,15 @@ class ZenohReply final z_owned_reply_t reply_; }; -struct rmw_client_data_t +class rmw_client_data_t final { +public: z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; // Liveliness token for the client. zc_owned_liveliness_token_t token; - std::mutex replies_mutex; - std::deque> replies; - const void * request_type_support_impl; const void * response_type_support_impl; const char * typesupport_identifier; @@ -215,14 +245,31 @@ struct rmw_client_data_t rmw_context_t * context; - std::mutex internal_mutex; - std::condition_variable * condition{nullptr}; - uint8_t client_guid[RMW_GID_STORAGE_SIZE]; size_t get_next_sequence_number(); - std::mutex sequence_number_mutex; + + void add_new_reply(std::unique_ptr reply); + + bool reply_queue_is_empty() const; + + void attach_condition(std::condition_variable * condition_variable); + + void detach_condition(); + + std::unique_ptr pop_next_reply(); + +private: + void notify(); + size_t sequence_number{1}; + std::mutex sequence_number_mutex; + + std::condition_variable * condition_{nullptr}; + std::mutex condition_mutex_; + + std::deque> reply_queue_; + mutable std::mutex reply_queue_mutex_; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 0e1d53c4..c510db3e 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1501,17 +1501,10 @@ static rmw_ret_t __rmw_take( // RETRIEVE SERIALIZED MESSAGE =============================================== - std::unique_ptr msg_data; - { - std::lock_guard lock(sub_data->message_queue_mutex); - - if (sub_data->message_queue.empty()) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - msg_data = std::move(sub_data->message_queue.front()); - sub_data->message_queue.pop_front(); + std::unique_ptr msg_data = sub_data->pop_next_message(); + if (msg_data == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; } // Object that manages the raw buffer @@ -1969,7 +1962,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); - client_data->replies.clear(); z_drop(z_move(client_data->token)); RMW_TRY_DESTRUCTOR( @@ -2241,16 +2233,12 @@ rmw_take_response( RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = nullptr; - { - std::lock_guard lock(client_data->replies_mutex); - if (client_data->replies.empty()) { - RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); - return RMW_RET_ERROR; - } - latest_reply = std::move(client_data->replies.front()); - client_data->replies.pop_front(); + std::unique_ptr latest_reply = client_data->pop_next_reply(); + if (latest_reply == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_ERROR; } + std::optional sample = latest_reply->get_sample(); if (!sample) { RMW_SET_ERROR_MSG("invalid reply sample"); @@ -2625,8 +2613,6 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); - service_data->sequence_to_query_map.clear(); - service_data->query_queue.clear(); z_drop(z_move(service_data->token)); RMW_TRY_DESTRUCTOR( @@ -2675,14 +2661,10 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - std::unique_ptr query = nullptr; - { - std::lock_guard lock(service_data->query_queue_mutex); - if (service_data->query_queue.empty()) { - return RMW_RET_OK; - } - query = std::move(service_data->query_queue.front()); - service_data->query_queue.pop_front(); + std::unique_ptr query = service_data->pop_next_query(); + if (query == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; } const z_query_t loaned_query = query->get_query(); @@ -2737,16 +2719,11 @@ rmw_take_request( request_header->received_timestamp = now_ns.count(); // Add this query to the map, so that rmw_send_response can quickly look it up later + if (!service_data->add_to_query_map( + request_header->request_id.sequence_number, std::move(query))) { - std::lock_guard lock(service_data->sequence_to_query_map_mutex); - if (service_data->sequence_to_query_map.find(request_header->request_id.sequence_number) != - service_data->sequence_to_query_map.end()) - { - RMW_SET_ERROR_MSG("duplicate sequence number in the map"); - return RMW_RET_ERROR; - } - service_data->sequence_to_query_map.emplace( - std::pair(request_header->request_id.sequence_number, std::move(query))); + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; } *taken = true; @@ -2818,13 +2795,14 @@ rmw_send_response( size_t data_length = ser.getSerializedDataLength(); // Create the queryable payload - std::lock_guard lock(service_data->sequence_to_query_map_mutex); - auto query_it = service_data->sequence_to_query_map.find(request_header->sequence_number); - if (query_it == service_data->sequence_to_query_map.end()) { + std::unique_ptr query = + service_data->take_from_query_map(request_header->sequence_number); + if (query == nullptr) { RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); return RMW_RET_ERROR; } - const z_query_t loaned_query = query_it->second->get_query(); + + const z_query_t loaned_query = query->get_query(); z_query_reply_options_t options = z_query_reply_options_default(); z_owned_bytes_map_t map = create_map_and_set_sequence_num( @@ -2845,7 +2823,6 @@ rmw_send_response( &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( response_bytes), data_length, &options); - service_data->sequence_to_query_map.erase(query_it); return RMW_RET_OK; } @@ -3049,8 +3026,10 @@ static bool has_triggered_condition( if (guard_conditions) { for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); - if (gc != nullptr && gc->has_triggered()) { - return true; + if (gc != nullptr) { + if (gc->get_and_reset_trigger()) { + return true; + } } } } @@ -3061,8 +3040,7 @@ static bool has_triggered_condition( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - std::lock_guard internal_lock(sub_data->internal_mutex); - if (!sub_data->message_queue.empty()) { + if (!sub_data->message_queue_is_empty()) { return true; } } @@ -3073,8 +3051,7 @@ static bool has_triggered_condition( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - std::lock_guard internal_lock(serv_data->internal_mutex); - if (!serv_data->query_queue.empty()) { + if (!serv_data->query_queue_is_empty()) { return true; } } @@ -3085,8 +3062,7 @@ static bool has_triggered_condition( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - if (!client_data->replies.empty()) { + if (!client_data->reply_queue_is_empty()) { return true; } } @@ -3161,8 +3137,7 @@ rmw_wait( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - std::lock_guard internal_lock(sub_data->internal_mutex); - sub_data->condition = &wait_set_data->condition_variable; + sub_data->attach_condition(&wait_set_data->condition_variable); } } } @@ -3173,8 +3148,7 @@ rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - std::lock_guard internal_lock(serv_data->internal_mutex); - serv_data->condition = &wait_set_data->condition_variable; + serv_data->attach_condition(&wait_set_data->condition_variable); } } } @@ -3185,8 +3159,7 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - client_data->condition = &wait_set_data->condition_variable; + client_data->attach_condition(&wait_set_data->condition_variable); } } } @@ -3215,7 +3188,7 @@ rmw_wait( gc->detach_condition(); // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (!gc->has_triggered()) { + if (!gc->get_and_reset_trigger()) { guard_conditions->guard_conditions[i] = nullptr; } } @@ -3227,11 +3200,10 @@ rmw_wait( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - std::lock_guard internal_lock(sub_data->internal_mutex); - sub_data->condition = nullptr; + sub_data->detach_condition(); // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (sub_data->message_queue.empty()) { + if (sub_data->message_queue_is_empty()) { // Setting to nullptr lets rcl know that this subscription is not ready subscriptions->subscribers[i] = nullptr; } @@ -3244,9 +3216,10 @@ rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - std::lock_guard internal_lock(serv_data->internal_mutex); - serv_data->condition = nullptr; - if (serv_data->query_queue.empty()) { + serv_data->detach_condition(); + // According to the documentation for rmw_wait in rmw.h, entries in the + // array that have *not* been triggered should be set to NULL + if (serv_data->query_queue_is_empty()) { // Setting to nullptr lets rcl know that this service is not ready services->services[i] = nullptr; } @@ -3259,11 +3232,10 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - client_data->condition = nullptr; + client_data->detach_condition(); // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (client_data->replies.empty()) { + if (client_data->reply_queue_is_empty()) { // Setting to nullptr lets rcl know that this client is not ready clients->clients[i] = nullptr; }