diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index a4ef87a4..0ee14199 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -172,6 +172,55 @@ std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequ return query; } +void rmw_client_data_t::notify() +{ + std::lock_guard lock(condition_mutex_); + if (condition_ != nullptr) { + condition_->notify_one(); + } +} + +void rmw_client_data_t::add_new_reply(std::unique_ptr reply) +{ + std::lock_guard lock(reply_queue_mutex_); + reply_queue_.emplace_back(std::move(reply)); + + notify(); +} + +bool rmw_client_data_t::reply_queue_is_empty() const +{ + std::lock_guard lock(reply_queue_mutex_); + + return reply_queue_.empty(); +} + +void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable) +{ + std::lock_guard lock(condition_mutex_); + condition_ = condition_variable; +} + +void rmw_client_data_t::detach_condition() +{ + std::lock_guard lock(condition_mutex_); + condition_ = nullptr; +} + +std::unique_ptr rmw_client_data_t::pop_next_reply() +{ + std::lock_guard lock(reply_queue_mutex_); + + if (reply_queue_.empty()) { + return nullptr; + } + + std::unique_ptr latest_reply = std::move(reply_queue_.front()); + reply_queue_.pop_front(); + + return latest_reply; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -288,16 +337,8 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } - { - std::lock_guard msg_lock(client_data->replies_mutex); - // Take ownership of the reply. - client_data->replies.emplace_back(std::make_unique(reply)); - *reply = z_reply_null(); - } - { - std::lock_guard internal_lock(client_data->internal_mutex); - if (client_data->condition != nullptr) { - client_data->condition->notify_one(); - } - } + + client_data->add_new_reply(std::make_unique(reply)); + // Since we took ownership of the reply, null it out here + *reply = z_reply_null(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 965800e2..133dd876 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -228,17 +228,15 @@ class ZenohReply final z_owned_reply_t reply_; }; -struct rmw_client_data_t +class rmw_client_data_t final { +public: z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; // Liveliness token for the client. zc_owned_liveliness_token_t token; - std::mutex replies_mutex; - std::deque> replies; - const void * request_type_support_impl; const void * response_type_support_impl; const char * typesupport_identifier; @@ -247,14 +245,31 @@ struct rmw_client_data_t rmw_context_t * context; - std::mutex internal_mutex; - std::condition_variable * condition{nullptr}; - uint8_t client_guid[RMW_GID_STORAGE_SIZE]; size_t get_next_sequence_number(); - std::mutex sequence_number_mutex; + + void add_new_reply(std::unique_ptr reply); + + bool reply_queue_is_empty() const; + + void attach_condition(std::condition_variable * condition_variable); + + void detach_condition(); + + std::unique_ptr pop_next_reply(); + +private: + void notify(); + size_t sequence_number{1}; + std::mutex sequence_number_mutex; + + std::condition_variable * condition_{nullptr}; + std::mutex condition_mutex_; + + std::deque> reply_queue_; + mutable std::mutex reply_queue_mutex_; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index f79c0ced..304d1ae2 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1962,7 +1962,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); - client_data->replies.clear(); z_drop(z_move(client_data->token)); RMW_TRY_DESTRUCTOR( @@ -2234,16 +2233,12 @@ rmw_take_response( RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = nullptr; - { - std::lock_guard lock(client_data->replies_mutex); - if (client_data->replies.empty()) { - RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); - return RMW_RET_ERROR; - } - latest_reply = std::move(client_data->replies.front()); - client_data->replies.pop_front(); + std::unique_ptr latest_reply = client_data->pop_next_reply(); + if (latest_reply == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_ERROR; } + std::optional sample = latest_reply->get_sample(); if (!sample) { RMW_SET_ERROR_MSG("invalid reply sample"); @@ -3065,8 +3060,7 @@ static bool has_triggered_condition( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - if (!client_data->replies.empty()) { + if (!client_data->reply_queue_is_empty()) { return true; } } @@ -3163,8 +3157,7 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - client_data->condition = &wait_set_data->condition_variable; + client_data->attach_condition(&wait_set_data->condition_variable); } } } @@ -3237,11 +3230,10 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - std::lock_guard internal_lock(client_data->internal_mutex); - client_data->condition = nullptr; + client_data->detach_condition(); // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (client_data->replies.empty()) { + if (client_data->reply_queue_is_empty()) { // Setting to nullptr lets rcl know that this client is not ready clients->clients[i] = nullptr; }