Skip to content

Commit

Permalink
Make ServiceData thread-safe (#292)
Browse files Browse the repository at this point in the history
* Make ServiceData thread-safe

Signed-off-by: Yadunund <[email protected]>

* Feedback

Signed-off-by: Yadunund <[email protected]>

* check for shutdown

Signed-off-by: Yadunund <[email protected]>

* Remove redundant undeclare token

Signed-off-by: Yadunund <[email protected]>

* Fix docstring

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund authored Oct 10, 2024
1 parent 439d6dc commit ca899ac
Show file tree
Hide file tree
Showing 14 changed files with 926 additions and 721 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/rmw_data_types.cpp
src/detail/rmw_publisher_data.cpp
src/detail/rmw_node_data.cpp
src/detail/rmw_service_data.cpp
src/detail/rmw_subscription_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
Expand Down
197 changes: 0 additions & 197 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
#include <cstring>
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <utility>

#include "liveliness_utils.hpp"
Expand All @@ -35,137 +32,6 @@
///=============================================================================
namespace rmw_zenoh_cpp
{
///=============================================================================
bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not(
rmw_wait_set_data_t * wait_set_data)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (!query_queue_.empty()) {
return true;
}
wait_set_data_ = wait_set_data;

return false;
}

///=============================================================================
bool rmw_service_data_t::detach_condition_and_queue_is_empty()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
wait_set_data_ = nullptr;

return query_queue_.empty();
}

///=============================================================================
std::unique_ptr<ZenohQuery> rmw_service_data_t::pop_next_query()
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
if (query_queue_.empty()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_queue_.front());
query_queue_.pop_front();

return query;
}

///=============================================================================
void rmw_service_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (wait_set_data_ != nullptr) {
std::lock_guard<std::mutex> wait_set_lock(wait_set_data_->condition_mutex);
wait_set_data_->triggered = true;
wait_set_data_->condition_variable.notify_one();
}
}

///=============================================================================
void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL &&
query_queue_.size() >= adapted_qos_profile.depth)
{
// Log warning if message is discarded due to hitting the queue depth
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr));
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Query queue depth of %ld reached, discarding oldest Query "
"for service for %s",
adapted_qos_profile.depth,
z_loan(keystr));
z_drop(z_move(keystr));
query_queue_.pop_front();
}
query_queue_.emplace_back(std::move(query));

// Since we added new data, trigger user callback and guard condition if they are available
data_callback_mgr.trigger_callback();
notify();
}

///=============================================================================
bool rmw_service_data_t::add_to_query_map(
const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query)
{
const size_t hash = rmw_zenoh_cpp::hash_gid(request_id.writer_guid);

std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);

std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);

if (it == sequence_to_query_map_.end()) {
SequenceToQuery stq;

sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq)));

it = sequence_to_query_map_.find(hash);
} else {
// Client already in the map

if (it->second.find(request_id.sequence_number) != it->second.end()) {
return false;
}
}

it->second.insert(std::make_pair(request_id.sequence_number, std::move(query)));

return true;
}

///=============================================================================
std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(
const rmw_request_id_t & request_id)
{
const size_t hash = rmw_zenoh_cpp::hash_gid(request_id.writer_guid);

std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);

std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);

if (it == sequence_to_query_map_.end()) {
return nullptr;
}

SequenceToQuery::iterator query_it = it->second.find(request_id.sequence_number);

if (query_it == it->second.end()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_it->second);
it->second.erase(query_it);

if (sequence_to_query_map_[hash].size() == 0) {
sequence_to_query_map_.erase(hash);
}

return query;
}

///=============================================================================
void rmw_client_data_t::notify()
{
Expand Down Expand Up @@ -275,69 +141,6 @@ bool rmw_client_data_t::is_shutdown() const
return is_shutdown_;
}

///=============================================================================
ZenohQuery::ZenohQuery(const z_query_t * query)
{
query_ = z_query_clone(query);
}

///=============================================================================
ZenohQuery::~ZenohQuery()
{
z_drop(z_move(query_));
}

///=============================================================================
const z_query_t ZenohQuery::get_query() const
{
return z_query_loan(&query_);
}

//==============================================================================
void service_data_handler(const z_query_t * query, void * data)
{
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));
auto drop_keystr = rcpputils::make_scope_exit(
[&keystr]() {
z_drop(z_move(keystr));
});

rmw_service_data_t * service_data =
static_cast<rmw_service_data_t *>(data);
if (service_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_service_data_t from data for "
"service for %s",
z_loan(keystr)
);
return;
}

service_data->add_new_query(std::make_unique<ZenohQuery>(query));
}

///=============================================================================
ZenohReply::ZenohReply(const z_owned_reply_t * reply)
{
reply_ = *reply;
}

///=============================================================================
ZenohReply::~ZenohReply()
{
z_reply_drop(z_move(reply_));
}

///=============================================================================
std::optional<z_sample_t> ZenohReply::get_sample() const
{
if (z_reply_is_ok(&reply_)) {
return z_reply_ok(&reply_);
}

return std::nullopt;
}

///=============================================================================
size_t rmw_client_data_t::get_next_sequence_number()
Expand Down
93 changes: 1 addition & 92 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "rcutils/allocator.h"

Expand All @@ -34,106 +31,18 @@
#include "rosidl_runtime_c/type_hash.h"

#include "event.hpp"
#include "graph_cache.hpp"
#include "message_type_support.hpp"
#include "rmw_wait_set_data.hpp"
#include "service_type_support.hpp"
#include "zenoh_utils.hpp"

/// Structs for various type erased data fields.

namespace rmw_zenoh_cpp
{
///=============================================================================
void service_data_handler(const z_query_t * query, void * service_data);

///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * client_data);
void client_data_drop(void * data);

///=============================================================================
class ZenohQuery final
{
public:
ZenohQuery(const z_query_t * query);

~ZenohQuery();

const z_query_t get_query() const;

private:
z_owned_query_t query_;
};

///=============================================================================
class rmw_service_data_t final
{
public:
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity;

z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

// Store the actual QoS profile used to configure this service.
// The QoS is reused for getting requests and sending responses.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the service.
zc_owned_liveliness_token_t token;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

rmw_context_t * context;

bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data);

bool detach_condition_and_queue_is_empty();

std::unique_ptr<ZenohQuery> pop_next_query();

void add_new_query(std::unique_ptr<ZenohQuery> query);

bool add_to_query_map(const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query);

std::unique_ptr<ZenohQuery> take_from_query_map(const rmw_request_id_t & request_id);

DataCallbackManager data_callback_mgr;

private:
void notify();

// Deque to store the queries in the order they arrive.
std::deque<std::unique_ptr<ZenohQuery>> query_queue_;
mutable std::mutex query_queue_mutex_;

// Map to store the sequence_number (as given by the client) -> ZenohQuery
using SequenceToQuery = std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>>;
std::unordered_map<size_t, SequenceToQuery> sequence_to_query_map_;
std::mutex sequence_to_query_map_mutex_;

rmw_wait_set_data_t * wait_set_data_{nullptr};
std::mutex condition_mutex_;
};

///=============================================================================
class ZenohReply final
{
public:
ZenohReply(const z_owned_reply_t * reply);

~ZenohReply();

std::optional<z_sample_t> get_sample() const;

private:
z_owned_reply_t reply_;
};

///=============================================================================
class rmw_client_data_t final
{
Expand Down
Loading

0 comments on commit ca899ac

Please sign in to comment.