Skip to content

Commit

Permalink
Move more of the implementation into rmw_service_data_t.
Browse files Browse the repository at this point in the history
This just allows us to hide more of the implementation and
remove it from rmw_zenoh.cpp.  It also fixes some of the
locking, which wasn't correct previously.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Jan 23, 2024
1 parent d1c9af3 commit 976cf80
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 50 deletions.
88 changes: 76 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,81 @@ void rmw_subscription_data_t::add_new_message(
notify();
}

bool rmw_service_data_t::query_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
return query_queue_.empty();
}

void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

void rmw_service_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

std::unique_ptr<ZenohQuery> rmw_service_data_t::pop_next_query()
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
if (query_queue_.empty()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_queue_.front());
query_queue_.pop_front();

return query;
}

void rmw_service_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
}
}

void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
query_queue_.emplace_back(std::move(query));

// Since we added new data, trigger the guard condition if it is available
notify();
}

bool rmw_service_data_t::add_to_query_map(
int64_t sequence_number, std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) {
return false;
}
sequence_to_query_map_.emplace(
std::pair(sequence_number, std::move(query)));

return true;
}

std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(int64_t sequence_number)
{
std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
auto query_it = sequence_to_query_map_.find(sequence_number);
if (query_it == sequence_to_query_map_.end()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_it->second);
sequence_to_query_map_.erase(query_it);

return query;
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
Expand Down Expand Up @@ -160,18 +235,7 @@ void service_data_handler(const z_query_t * query, void * data)
return;
}

// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
service_data->query_queue.emplace_back(std::make_unique<ZenohQuery>(query));
}
{
// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(service_data->internal_mutex);
if (service_data->condition != nullptr) {
service_data->condition->notify_one();
}
}
service_data->add_new_query(std::make_unique<ZenohQuery>(query));
}

ZenohReply::ZenohReply(const z_owned_reply_t * reply)
Expand Down
32 changes: 25 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ class ZenohQuery final
z_owned_query_t query_;
};

struct rmw_service_data_t
class rmw_service_data_t final
{
public:
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

Expand All @@ -183,16 +184,33 @@ struct rmw_service_data_t

rmw_context_t * context;

bool query_queue_is_empty() const;

void attach_condition(std::condition_variable * condition_variable);

void detach_condition();

std::unique_ptr<ZenohQuery> pop_next_query();

void add_new_query(std::unique_ptr<ZenohQuery> query);

bool add_to_query_map(int64_t sequence_number, std::unique_ptr<ZenohQuery> query);

std::unique_ptr<ZenohQuery> take_from_query_map(int64_t sequence_number);

private:
void notify();

// Deque to store the queries in the order they arrive.
std::deque<std::unique_ptr<ZenohQuery>> query_queue;
std::mutex query_queue_mutex;
std::deque<std::unique_ptr<ZenohQuery>> query_queue_;
mutable std::mutex query_queue_mutex_;

// Map to store the sequence_number -> query_id
std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>> sequence_to_query_map;
std::mutex sequence_to_query_map_mutex;
std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>> sequence_to_query_map_;
std::mutex sequence_to_query_map_mutex_;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};
std::condition_variable * condition_{nullptr};
std::mutex condition_mutex_;
};

///==============================================================================
Expand Down
50 changes: 19 additions & 31 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2618,8 +2618,6 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service)
// CLEANUP ================================================================
z_drop(z_move(service_data->keyexpr));
z_drop(z_move(service_data->qable));
service_data->sequence_to_query_map.clear();
service_data->query_queue.clear();
z_drop(z_move(service_data->token));

RMW_TRY_DESTRUCTOR(
Expand Down Expand Up @@ -2668,14 +2666,10 @@ rmw_take_request(
RMW_CHECK_FOR_NULL_WITH_MSG(
service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT);

std::unique_ptr<ZenohQuery> query = nullptr;
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
if (service_data->query_queue.empty()) {
return RMW_RET_OK;
}
query = std::move(service_data->query_queue.front());
service_data->query_queue.pop_front();
std::unique_ptr<ZenohQuery> query = service_data->pop_next_query();
if (query == nullptr) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
return RMW_RET_OK;
}

const z_query_t loaned_query = query->get_query();
Expand Down Expand Up @@ -2730,16 +2724,11 @@ rmw_take_request(
request_header->received_timestamp = now_ns.count();

// Add this query to the map, so that rmw_send_response can quickly look it up later
if (!service_data->add_to_query_map(
request_header->request_id.sequence_number, std::move(query)))
{
std::lock_guard<std::mutex> lock(service_data->sequence_to_query_map_mutex);
if (service_data->sequence_to_query_map.find(request_header->request_id.sequence_number) !=
service_data->sequence_to_query_map.end())
{
RMW_SET_ERROR_MSG("duplicate sequence number in the map");
return RMW_RET_ERROR;
}
service_data->sequence_to_query_map.emplace(
std::pair(request_header->request_id.sequence_number, std::move(query)));
RMW_SET_ERROR_MSG("duplicate sequence number in the map");
return RMW_RET_ERROR;
}

*taken = true;
Expand Down Expand Up @@ -2811,13 +2800,14 @@ rmw_send_response(
size_t data_length = ser.getSerializedDataLength();

// Create the queryable payload
std::lock_guard<std::mutex> lock(service_data->sequence_to_query_map_mutex);
auto query_it = service_data->sequence_to_query_map.find(request_header->sequence_number);
if (query_it == service_data->sequence_to_query_map.end()) {
std::unique_ptr<ZenohQuery> query =
service_data->take_from_query_map(request_header->sequence_number);
if (query == nullptr) {
RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug.");
return RMW_RET_ERROR;
}
const z_query_t loaned_query = query_it->second->get_query();

const z_query_t loaned_query = query->get_query();
z_query_reply_options_t options = z_query_reply_options_default();

z_owned_bytes_map_t map = create_map_and_set_sequence_num(
Expand All @@ -2838,7 +2828,6 @@ rmw_send_response(
&loaned_query, z_loan(service_data->keyexpr), reinterpret_cast<const uint8_t *>(
response_bytes), data_length, &options);

service_data->sequence_to_query_map.erase(query_it);
return RMW_RET_OK;
}

Expand Down Expand Up @@ -3065,8 +3054,7 @@ static bool has_triggered_condition(
for (size_t i = 0; i < services->service_count; ++i) {
auto serv_data = static_cast<rmw_service_data_t *>(services->services[i]);
if (serv_data != nullptr) {
std::lock_guard<std::mutex> internal_lock(serv_data->internal_mutex);
if (!serv_data->query_queue.empty()) {
if (!serv_data->query_queue_is_empty()) {
return true;
}
}
Expand Down Expand Up @@ -3164,8 +3152,7 @@ rmw_wait(
for (size_t i = 0; i < services->service_count; ++i) {
auto serv_data = static_cast<rmw_service_data_t *>(services->services[i]);
if (serv_data != nullptr) {
std::lock_guard<std::mutex> internal_lock(serv_data->internal_mutex);
serv_data->condition = &wait_set_data->condition_variable;
serv_data->attach_condition(&wait_set_data->condition_variable);
}
}
}
Expand Down Expand Up @@ -3234,9 +3221,10 @@ rmw_wait(
for (size_t i = 0; i < services->service_count; ++i) {
auto serv_data = static_cast<rmw_service_data_t *>(services->services[i]);
if (serv_data != nullptr) {
std::lock_guard<std::mutex> internal_lock(serv_data->internal_mutex);
serv_data->condition = nullptr;
if (serv_data->query_queue.empty()) {
serv_data->detach_condition();
// According to the documentation for rmw_wait in rmw.h, entries in the
// array that have *not* been triggered should be set to NULL
if (serv_data->query_queue_is_empty()) {
// Setting to nullptr lets rcl know that this service is not ready
services->services[i] = nullptr;
}
Expand Down

0 comments on commit 976cf80

Please sign in to comment.