Skip to content

Commit

Permalink
Vastly simplify handling of in-flight ClientData.
Browse files Browse the repository at this point in the history
We basically use the pointer as a key to lookup the
shared_ptr, and then always use the shared_ptr.  That
ensures that even if the pointer was dropped from the
node while the client callback is operating, it will
still be valid and won't be destroyed until that method
returns.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Nov 12, 2024
1 parent bb69941 commit 821f4e9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 142 deletions.
167 changes: 36 additions & 131 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "attachment_helpers.hpp"
#include "cdr.hpp"
Expand All @@ -41,38 +40,17 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

namespace rmw_zenoh_cpp
namespace
{
// rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no
// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an
// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the
// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory.
// The next 3 global variables are used to avoid that situation. Any time a query is initiated via
// rmw_send_request(), num_in_flight_ is incremented. When Zenoh calls the callback for the
// query drop, num_in_flight_map->second is decremented.
// When ClientData is destroyed, it checks to see if there are things in flight. If there are,
// it leaves this ClientData pointer both in the num_in_flight_map and the deleted_clients map.
// When the client_data_handler() is called on these destroyed objects, it knows that it cannot
// dereference the data anymore, and it gets out early. When client_data_drop() is called, it
// decrements num_in_flight_map->second, and if that drops to zero, drops the pointer address
// completely from deleted_clients.
//
// There is one case which is not handled by this, which has to do with timeouts. The query
// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never
// returns, the memory in this structure will never be freed. There isn't much we can do about
// that at this time, but we may want to consider changing the timeout so that the memory can
// eventually be freed up.
//
// TODO(Yadunund): Remove these variables once we switch to zenoh-cpp and can capture
// weak_ptr<ClientData> in zenoh callbacks.
static std::mutex num_in_flight_mutex;
static std::unordered_map<const ClientData *, std::size_t> num_in_flight_map = {};
static std::unordered_set<const ClientData *> deleted_clients = {};

std::mutex client_data_ptr_to_shared_ptr_map_mutex;
std::unordered_map<const rmw_zenoh_cpp::ClientData *,
std::shared_ptr<rmw_zenoh_cpp::ClientData>> client_data_ptr_to_shared_ptr_map;

///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
auto client_data = static_cast<ClientData *>(data);
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -81,18 +59,16 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
return;
}

std::lock_guard<std::mutex> lock(num_in_flight_mutex);
if (deleted_clients.count(client_data) > 0) {
RMW_ZENOH_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"client_data_handler triggered for ClientData that has been deleted. Ignoring..."
);
return;
std::shared_ptr<rmw_zenoh_cpp::ClientData> client_data_shared_ptr{nullptr};
{
std::lock_guard<std::mutex> lk(client_data_ptr_to_shared_ptr_map_mutex);
if (client_data_ptr_to_shared_ptr_map.count(client_data) == 0) {
return;
}
client_data_shared_ptr = client_data_ptr_to_shared_ptr_map[client_data];
}

// See the comment about the "num_in_flight" class variable in the ClientData class for
// why we need to do this.
if (client_data->is_shutdown()) {
if (client_data_shared_ptr->is_shutdown()) {
return;
}

Expand All @@ -118,43 +94,17 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

client_data->add_new_reply(std::make_unique<ZenohReply>(reply, received_timestamp));
client_data_shared_ptr->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));

// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}

///=============================================================================
void client_data_drop(void * data)
{
auto client_data = static_cast<ClientData *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t "
);
return;
}

// See the comment about the "num_in_flight" class variable in the ClientData class for
// why we need to do this.
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
auto num_in_flight_it = num_in_flight_map.find(client_data);
if (num_in_flight_it == num_in_flight_map.end()) {
// This should never happen
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to find object in num_in_flight_map. Report this bug."
);
return;
}

--num_in_flight_it->second;
if (num_in_flight_it->second == 0 && deleted_clients.count(client_data) > 0) {
deleted_clients.erase(client_data);
}
}
} // namespace

namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ClientData> ClientData::make(
z_session_t session,
Expand Down Expand Up @@ -238,28 +188,23 @@ std::shared_ptr<ClientData> ClientData::make(
return nullptr;
}

std::lock_guard<std::mutex> lock(num_in_flight_mutex);
std::vector<std::shared_ptr<ClientData>> duplicate_pointers;
std::shared_ptr<ClientData> client_data;
do {
client_data = std::shared_ptr<ClientData>(
new ClientData{
node,
entity,
request_members,
response_members,
request_type_support,
response_type_support
});
duplicate_pointers.push_back(client_data);
} while (deleted_clients.count(client_data.get()) > 0);
std::shared_ptr<ClientData> client_data = std::shared_ptr<ClientData>(
new ClientData{
node,
entity,
request_members,
response_members,
request_type_support,
response_type_support
});

if (!client_data->init(session)) {
// init() already set the error.
return nullptr;
}

num_in_flight_map[client_data.get()] = 0;
std::lock_guard<std::mutex> lk(client_data_ptr_to_shared_ptr_map_mutex);
client_data_ptr_to_shared_ptr_map.emplace(client_data.get(), client_data);

return client_data;
}
Expand Down Expand Up @@ -508,18 +453,6 @@ rmw_ret_t ClientData::send_request(
z_bytes_map_drop(z_move(map));
});

// See the comment about the "num_in_flight" class variable in the ClientData class for
// why we need to do this.
{
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
auto num_in_flight_it = num_in_flight_map.find(this);
if (num_in_flight_it == num_in_flight_map.end()) {
// This should never happen
RMW_SET_ERROR_MSG("failed to find object in num_in_flight_map");
return RMW_RET_ERROR;
}
num_in_flight_it->second++;
}
opts.attachment = z_bytes_map_as_attachment(&map);
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
Expand All @@ -535,7 +468,7 @@ rmw_ret_t ClientData::send_request(
// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
z_owned_closure_reply_t zn_closure_reply =
z_closure(client_data_handler, client_data_drop, this);
z_closure(client_data_handler, nullptr, this);
z_get(
context_impl->session(),
z_loan(keyexpr_), "",
Expand All @@ -548,6 +481,11 @@ rmw_ret_t ClientData::send_request(
///=============================================================================
ClientData::~ClientData()
{
{
std::lock_guard<std::mutex> lk(client_data_ptr_to_shared_ptr_map_mutex);
client_data_ptr_to_shared_ptr_map.erase(this);
}

const rmw_ret_t ret = this->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
Expand All @@ -556,28 +494,6 @@ ClientData::~ClientData()
entity_->topic_info().value().name_.c_str()
);
}

std::lock_guard<std::mutex> lock(num_in_flight_mutex);
auto num_in_flight_it = num_in_flight_map.find(this);
if (num_in_flight_it != num_in_flight_map.end()) {
if (num_in_flight_it->second == 0) {
// If there is nothing in flight, we can remove this from the map
// with no further considerations.
num_in_flight_map.erase(this);
} else {
// Since there is still something in flight, we need to just add
// it to the deleted_clients; it will be deleted when the last
// outstanding query finishes.
deleted_clients.insert(this);
}
} else {
// This should never happen
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Error finding client /%s in num_in_flight_map. Report this bug.",
entity_->topic_info().value().name_.c_str()
);
}
}

//==============================================================================
Expand Down Expand Up @@ -633,17 +549,6 @@ rmw_ret_t ClientData::shutdown()
return RMW_RET_OK;
}

///=============================================================================
bool ClientData::query_in_flight() const
{
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
auto query_in_flight_it = num_in_flight_map.find(this);
if (query_in_flight_it != num_in_flight_map.end()) {
return query_in_flight_it->second > 0;
}
return false;
}

///=============================================================================
bool ClientData::is_shutdown() const
{
Expand Down
5 changes: 0 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Shutdown this ClientData.
rmw_ret_t shutdown();

// TODO(Yadunund): Remove this API once we are able to capture weak_ptr<ClientData>
// in the client closures to avoid the issue with queries in flight as described
// below.
bool query_in_flight() const;

// Check if this ClientData is shutdown.
bool is_shutdown() const;

Expand Down
9 changes: 3 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,10 @@ void NodeData::delete_client_data(const rmw_client_t * const client)
if (client_it == clients_.end()) {
return;
}
// We shutdown the client first and only if that is successful, we deallocate
// the ClientData. This is to keep the ClientData alive in cases where
// rmw_destroy_client is invoked while there are still queries in flight.
// Shutdown the client, then erase it. The code in rmw_client_data.cpp is careful about keeping
// it alive as long as necessary.
client_it->second->shutdown();
if (!client_it->second->query_in_flight()) {
clients_.erase(client);
}
clients_.erase(client);
}

///=============================================================================
Expand Down

0 comments on commit 821f4e9

Please sign in to comment.