Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another fix for ClientData. #336

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 24 additions & 49 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,30 @@

namespace
{
///=============================================================================
struct ClientDataWrapper
{
explicit ClientDataWrapper(std::shared_ptr<rmw_zenoh_cpp::ClientData> data)
: client_data(std::move(data))
{
}

std::shared_ptr<rmw_zenoh_cpp::ClientData> client_data;
};

///=============================================================================
void client_data_handler(z_loaned_reply_t * reply, void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
auto wrapper = static_cast<ClientDataWrapper *>(data);
if (wrapper == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t from data in client_data_handler."
);
return;
}

if (client_data->is_shutdown()) {
if (wrapper->client_data->is_shutdown()) {
return;
}

Expand All @@ -69,7 +79,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False for keyexpr %s. Reason: %.*s",
client_data->topic_info().topic_keyexpr_.c_str(),
wrapper->client_data->topic_info().topic_keyexpr_.c_str(),
static_cast<int>(z_string_len(z_loan(err_str))),
z_string_data(z_loan(err_str)));
z_drop(z_move(err_str));
Expand All @@ -80,23 +90,23 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

client_data->add_new_reply(
wrapper->client_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
}

///=============================================================================
void client_data_drop(void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
auto wrapper = static_cast<ClientDataWrapper *>(data);
if (wrapper == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t from data in client_data_drop."
);
return;
}

client_data->decrement_in_flight_and_conditionally_remove();
delete wrapper;
}

} // namespace
Expand Down Expand Up @@ -228,8 +238,7 @@ ClientData::ClientData(
wait_set_data_(nullptr),
sequence_number_(1),
is_shutdown_(false),
initialized_(false),
num_in_flight_(0)
initialized_(false)
{
// Do nothing.
}
Expand Down Expand Up @@ -470,9 +479,9 @@ rmw_ret_t ClientData::send_request(

// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
num_in_flight_++;
ClientDataWrapper * wrapper = new ClientDataWrapper(shared_from_this());
z_owned_closure_reply_t zn_closure_reply;
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this);
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, wrapper);
z_get(
sess_->loan(),
z_loan(keyexpr_), "",
Expand Down Expand Up @@ -527,10 +536,11 @@ bool ClientData::detach_condition_and_queue_is_empty()
}

///=============================================================================
void ClientData::_shutdown()
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return;
return RMW_RET_OK;
}

// Unregister this node from the ROS graph.
Expand All @@ -541,45 +551,10 @@ void ClientData::_shutdown()

sess_.reset();
is_shutdown_ = true;
}

///=============================================================================
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return RMW_RET_OK;
}

///=============================================================================
bool ClientData::shutdown_and_query_in_flight()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return num_in_flight_ > 0;
}

///=============================================================================
void ClientData::decrement_in_flight_and_conditionally_remove()
{
std::unique_lock<std::recursive_mutex> lock(mutex_);
--num_in_flight_;

if (is_shutdown_ && num_in_flight_ == 0) {
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(rmw_node_->data);
if (context_impl == nullptr) {
return;
}
std::shared_ptr<rmw_zenoh_cpp::NodeData> node_data = context_impl->get_node_data(rmw_node_);
if (node_data == nullptr) {
return;
}
// We have to unlock here since we are about to delete ourself, and thus the unlock would be UB.
lock.unlock();
node_data->delete_client_data(rmw_client_);
}
}

///=============================================================================
bool ClientData::is_shutdown() const
{
Expand Down
10 changes: 0 additions & 10 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Shutdown this ClientData.
rmw_ret_t shutdown();

// Shutdown this ClientData, and return whether there are any requests currently in flight.
bool shutdown_and_query_in_flight();

// Decrement the in flight requests, and if that drops to 0 remove the client from the node.
void decrement_in_flight_and_conditionally_remove();

// Check if this ClientData is shutdown.
bool is_shutdown() const;

Expand All @@ -122,9 +116,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Initialize the Zenoh objects for this entity.
bool init(std::shared_ptr<ZenohSession> session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();

// Internal mutex.
mutable std::recursive_mutex mutex_;
// The parent node.
Expand Down Expand Up @@ -155,7 +146,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
bool is_shutdown_;
// Whether the object has ever successfully been initialized.
bool initialized_;
size_t num_in_flight_;
};
using ClientDataPtr = std::shared_ptr<ClientData>;
using ClientDataConstPtr = std::shared_ptr<const ClientData>;
Expand Down
8 changes: 1 addition & 7 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,7 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client)
void NodeData::delete_client_data(const rmw_client_t * const client)
{
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto client_it = clients_.find(client);
if (client_it == clients_.end()) {
return;
}
if (!client_it->second->shutdown_and_query_in_flight()) {
clients_.erase(client);
}
clients_.erase(client);
}

///=============================================================================
Expand Down
Loading