Skip to content

Commit

Permalink
Move more of the implementation into rmw_client_data_t.
Browse files Browse the repository at this point in the history
This allows us to encapsulate more of the client data in
the right place, and also allows us to fix the locking.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Jan 23, 2024
1 parent 976cf80 commit 2d1e333
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 37 deletions.
65 changes: 53 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,55 @@ std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(int64_t sequ
return query;
}

void rmw_client_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
}
}

void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);
reply_queue_.emplace_back(std::move(reply));

notify();
}

bool rmw_client_data_t::reply_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);

return reply_queue_.empty();
}

void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

void rmw_client_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

std::unique_ptr<ZenohReply> rmw_client_data_t::pop_next_reply()
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);

if (reply_queue_.empty()) {
return nullptr;
}

std::unique_ptr<ZenohReply> latest_reply = std::move(reply_queue_.front());
reply_queue_.pop_front();

return latest_reply;
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
Expand Down Expand Up @@ -288,16 +337,8 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
);
return;
}
{
std::lock_guard<std::mutex> msg_lock(client_data->replies_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(std::make_unique<ZenohReply>(reply));
*reply = z_reply_null();
}
{
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
if (client_data->condition != nullptr) {
client_data->condition->notify_one();
}
}

client_data->add_new_reply(std::make_unique<ZenohReply>(reply));
// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}
31 changes: 23 additions & 8 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,15 @@ class ZenohReply final
z_owned_reply_t reply_;
};

struct rmw_client_data_t
class rmw_client_data_t final
{
public:
z_owned_keyexpr_t keyexpr;
z_owned_closure_reply_t zn_closure_reply;

// Liveliness token for the client.
zc_owned_liveliness_token_t token;

std::mutex replies_mutex;
std::deque<std::unique_ptr<ZenohReply>> replies;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
Expand All @@ -247,14 +245,31 @@ struct rmw_client_data_t

rmw_context_t * context;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

uint8_t client_guid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();
std::mutex sequence_number_mutex;

void add_new_reply(std::unique_ptr<ZenohReply> reply);

bool reply_queue_is_empty() const;

void attach_condition(std::condition_variable * condition_variable);

void detach_condition();

std::unique_ptr<ZenohReply> pop_next_reply();

private:
void notify();

size_t sequence_number{1};
std::mutex sequence_number_mutex;

std::condition_variable * condition_{nullptr};
std::mutex condition_mutex_;

std::deque<std::unique_ptr<ZenohReply>> reply_queue_;
mutable std::mutex reply_queue_mutex_;
};

#endif // DETAIL__RMW_DATA_TYPES_HPP_
26 changes: 9 additions & 17 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1962,7 +1962,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
// CLEANUP ===================================================================
z_drop(z_move(client_data->zn_closure_reply));
z_drop(z_move(client_data->keyexpr));
client_data->replies.clear();
z_drop(z_move(client_data->token));

RMW_TRY_DESTRUCTOR(
Expand Down Expand Up @@ -2234,16 +2233,12 @@ rmw_take_response(
RMW_CHECK_FOR_NULL_WITH_MSG(
client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT);

std::unique_ptr<ZenohReply> latest_reply = nullptr;
{
std::lock_guard<std::mutex> lock(client_data->replies_mutex);
if (client_data->replies.empty()) {
RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty");
return RMW_RET_ERROR;
}
latest_reply = std::move(client_data->replies.front());
client_data->replies.pop_front();
std::unique_ptr<ZenohReply> latest_reply = client_data->pop_next_reply();
if (latest_reply == nullptr) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
return RMW_RET_ERROR;
}

std::optional<z_sample_t> sample = latest_reply->get_sample();
if (!sample) {
RMW_SET_ERROR_MSG("invalid reply sample");
Expand Down Expand Up @@ -3065,8 +3060,7 @@ static bool has_triggered_condition(
for (size_t i = 0; i < clients->client_count; ++i) {
rmw_client_data_t * client_data = static_cast<rmw_client_data_t *>(clients->clients[i]);
if (client_data != nullptr) {
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
if (!client_data->replies.empty()) {
if (!client_data->reply_queue_is_empty()) {
return true;
}
}
Expand Down Expand Up @@ -3163,8 +3157,7 @@ rmw_wait(
for (size_t i = 0; i < clients->client_count; ++i) {
rmw_client_data_t * client_data = static_cast<rmw_client_data_t *>(clients->clients[i]);
if (client_data != nullptr) {
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
client_data->condition = &wait_set_data->condition_variable;
client_data->attach_condition(&wait_set_data->condition_variable);
}
}
}
Expand Down Expand Up @@ -3237,11 +3230,10 @@ rmw_wait(
for (size_t i = 0; i < clients->client_count; ++i) {
rmw_client_data_t * client_data = static_cast<rmw_client_data_t *>(clients->clients[i]);
if (client_data != nullptr) {
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
client_data->condition = nullptr;
client_data->detach_condition();
// According to the documentation for rmw_wait in rmw.h, entries in the
// array that have *not* been triggered should be set to NULL
if (client_data->replies.empty()) {
if (client_data->reply_queue_is_empty()) {
// Setting to nullptr lets rcl know that this client is not ready
clients->clients[i] = nullptr;
}
Expand Down

0 comments on commit 2d1e333

Please sign in to comment.