Skip to content

Commit

Permalink
Revert "Fix race conditions in rmw_wait and map queries to clients (#153
Browse files Browse the repository at this point in the history
)" (#157)

This reverts commit c1c6f95.
  • Loading branch information
Yadunund authored Apr 18, 2024
1 parent c1c6f95 commit 655a961
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 160 deletions.
10 changes: 3 additions & 7 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ void EventsManager::add_new_event(
///=============================================================================
void EventsManager::attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
Expand All @@ -195,8 +194,7 @@ void EventsManager::attach_event_condition(
return;
}

std::lock_guard<std::mutex> lock(update_event_condition_mutex_);
event_condition_mutexes_[event_id] = condition_mutex;
std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = condition_variable;
}

Expand All @@ -211,8 +209,7 @@ void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id)
return;
}

std::lock_guard<std::mutex> lock(update_event_condition_mutex_);
event_condition_mutexes_[event_id] = nullptr;
std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = nullptr;
}

Expand All @@ -227,9 +224,8 @@ void EventsManager::notify_event(rmw_zenoh_event_type_t event_id)
return;
}

std::lock_guard<std::mutex> lock(update_event_condition_mutex_);
std::lock_guard<std::mutex> lock(event_condition_mutex_);
if (event_conditions_[event_id] != nullptr) {
std::lock_guard<std::mutex> cvlk(*event_condition_mutexes_[event_id]);
event_conditions_[event_id]->notify_one();
}
}
4 changes: 1 addition & 3 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class EventsManager
/// @param condition_variable to attach.
void attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::mutex * condition_mutex,
std::condition_variable * condition_variable);

/// @brief Detach the condition variable provided by rmw_wait.
Expand All @@ -155,8 +154,7 @@ class EventsManager
/// Mutex to lock when read/writing members.
mutable std::mutex event_mutex_;
/// Mutex to lock for event_condition.
mutable std::mutex update_event_condition_mutex_;
std::mutex * event_condition_mutexes_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// User callback that can be set via data_callback_mgr.set_callback().
Expand Down
7 changes: 1 addition & 6 deletions rmw_zenoh_cpp/src/detail/guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,21 @@ void GuardCondition::trigger()
has_triggered_ = true;

if (condition_variable_ != nullptr) {
std::lock_guard<std::mutex> cvlk(*condition_mutex_);
condition_variable_->notify_one();
}
}

///==============================================================================
void GuardCondition::attach_condition(
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
void GuardCondition::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(internal_mutex_);
condition_mutex_ = condition_mutex;
condition_variable_ = condition_variable;
}

///==============================================================================
void GuardCondition::detach_condition()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
condition_mutex_ = nullptr;
condition_variable_ = nullptr;
}

Expand Down
5 changes: 2 additions & 3 deletions rmw_zenoh_cpp/src/detail/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class GuardCondition final
// Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set.
void trigger();

void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable);
void attach_condition(std::condition_variable * condition_variable);

void detach_condition();

Expand All @@ -38,8 +38,7 @@ class GuardCondition final
private:
mutable std::mutex internal_mutex_;
std::atomic_bool has_triggered_;
std::mutex * condition_mutex_{nullptr};
std::condition_variable * condition_variable_{nullptr};
std::condition_variable * condition_variable_;
};

#endif // DETAIL__GUARD_CONDITION_HPP_
107 changes: 21 additions & 86 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <utility>

Expand Down Expand Up @@ -63,32 +62,25 @@ size_t rmw_publisher_data_t::get_next_sequence_number()
}

///=============================================================================
void rmw_subscription_data_t::attach_condition(
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = condition_mutex;
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

///=============================================================================
void rmw_subscription_data_t::notify()
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
// We also need to take the mutex for the condition_variable; see the comment
// in rmw_wait for more information
std::lock_guard<std::mutex> cvlk(*condition_mutex_);
condition_->notify_one();
}
}

///=============================================================================
void rmw_subscription_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = nullptr;
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

Expand Down Expand Up @@ -157,20 +149,16 @@ bool rmw_service_data_t::query_queue_is_empty() const
}

///=============================================================================
void rmw_service_data_t::attach_condition(
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = condition_mutex;
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

///=============================================================================
void rmw_service_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = nullptr;
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

Expand All @@ -191,11 +179,8 @@ std::unique_ptr<ZenohQuery> rmw_service_data_t::pop_next_query()
///=============================================================================
void rmw_service_data_t::notify()
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
// We also need to take the mutex for the condition_variable; see the comment
// in rmw_wait for more information
std::lock_guard<std::mutex> cvlk(*condition_mutex_);
condition_->notify_one();
}
}
Expand Down Expand Up @@ -223,86 +208,40 @@ void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
notify();
}

static size_t hash_gid(const rmw_request_id_t & request_id)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(request_id.writer_guid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
bool rmw_service_data_t::add_to_query_map(
const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query)
int64_t sequence_number, std::unique_ptr<ZenohQuery> query)
{
size_t hash = hash_gid(request_id);

std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);

std::unordered_map<size_t, SequenceToQuery>::iterator it =
sequence_to_query_map_.find(hash);

if (it == sequence_to_query_map_.end()) {
SequenceToQuery stq;

sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq)));

it = sequence_to_query_map_.find(hash);
} else {
// Client already in the map

if (it->second.find(request_id.sequence_number) != it->second.end()) {
return false;
}
if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) {
return false;
}

it->second.insert(
std::make_pair(request_id.sequence_number, std::move(query)));
sequence_to_query_map_.emplace(
std::pair(sequence_number, std::move(query)));

return true;
}

///=============================================================================
std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(
const rmw_request_id_t & request_id)
std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(int64_t sequence_number)
{
size_t hash = hash_gid(request_id);

std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);

std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);

if (it == sequence_to_query_map_.end()) {
return nullptr;
}

SequenceToQuery::iterator query_it = it->second.find(request_id.sequence_number);

if (query_it == it->second.end()) {
auto query_it = sequence_to_query_map_.find(sequence_number);
if (query_it == sequence_to_query_map_.end()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_it->second);
it->second.erase(query_it);

if (sequence_to_query_map_[hash].size() == 0) {
sequence_to_query_map_.erase(hash);
}
sequence_to_query_map_.erase(query_it);

return query;
}

///=============================================================================
void rmw_client_data_t::notify()
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
// We also need to take the mutex for the condition_variable; see the comment
// in rmw_wait for more information
std::lock_guard<std::mutex> cvlk(*condition_mutex_);
condition_->notify_one();
}
}
Expand Down Expand Up @@ -339,20 +278,16 @@ bool rmw_client_data_t::reply_queue_is_empty() const
}

///=============================================================================
void rmw_client_data_t::attach_condition(
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = condition_mutex;
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

///=============================================================================
void rmw_client_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = nullptr;
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

Expand Down
25 changes: 11 additions & 14 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <variant>
#include <vector>
Expand Down Expand Up @@ -172,7 +173,7 @@ class rmw_subscription_data_t final
MessageTypeSupport * type_support;
rmw_context_t * context;

void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable);
void attach_condition(std::condition_variable * condition_variable);

void detach_condition();

Expand All @@ -191,9 +192,8 @@ class rmw_subscription_data_t final

void notify();

std::mutex * condition_mutex_{nullptr};
std::condition_variable * condition_{nullptr};
std::mutex update_condition_mutex_;
std::mutex condition_mutex_;
};


Expand Down Expand Up @@ -244,17 +244,17 @@ class rmw_service_data_t final

bool query_queue_is_empty() const;

void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable);
void attach_condition(std::condition_variable * condition_variable);

void detach_condition();

std::unique_ptr<ZenohQuery> pop_next_query();

void add_new_query(std::unique_ptr<ZenohQuery> query);

bool add_to_query_map(const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query);
bool add_to_query_map(int64_t sequence_number, std::unique_ptr<ZenohQuery> query);

std::unique_ptr<ZenohQuery> take_from_query_map(const rmw_request_id_t & request_id);
std::unique_ptr<ZenohQuery> take_from_query_map(int64_t sequence_number);

DataCallbackManager data_callback_mgr;

Expand All @@ -265,14 +265,12 @@ class rmw_service_data_t final
std::deque<std::unique_ptr<ZenohQuery>> query_queue_;
mutable std::mutex query_queue_mutex_;

// Map to store the sequence_number (as given by the client) -> ZenohQuery
using SequenceToQuery = std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>>;
std::unordered_map<size_t, SequenceToQuery> sequence_to_query_map_;
// Map to store the sequence_number -> query_id
std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>> sequence_to_query_map_;
std::mutex sequence_to_query_map_mutex_;

std::mutex * condition_mutex_{nullptr};
std::condition_variable * condition_{nullptr};
std::mutex update_condition_mutex_;
std::mutex condition_mutex_;
};

///=============================================================================
Expand Down Expand Up @@ -322,7 +320,7 @@ class rmw_client_data_t final

bool reply_queue_is_empty() const;

void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable);
void attach_condition(std::condition_variable * condition_variable);

void detach_condition();

Expand All @@ -336,9 +334,8 @@ class rmw_client_data_t final
size_t sequence_number_{1};
std::mutex sequence_number_mutex_;

std::mutex * condition_mutex_{nullptr};
std::condition_variable * condition_{nullptr};
std::mutex update_condition_mutex_;
std::mutex condition_mutex_;

std::deque<std::unique_ptr<ZenohReply>> reply_queue_;
mutable std::mutex reply_queue_mutex_;
Expand Down
Loading

0 comments on commit 655a961

Please sign in to comment.