Skip to content

Commit

Permalink
Another fix for ClientData. (ros2#336)
Browse files Browse the repository at this point in the history
* Another fix for ClientData.

Signed-off-by: Chris Lalancette <[email protected]>

* Minor optimizations

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Chris Lalancette <[email protected]>
Signed-off-by: Yadunund <[email protected]>
Co-authored-by: Yadunund <[email protected]>
  • Loading branch information
clalancette and Yadunund authored Dec 11, 2024
1 parent 8bca9d7 commit ca5058c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 66 deletions.
73 changes: 24 additions & 49 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,30 @@

namespace
{
///=============================================================================
struct ClientDataWrapper
{
explicit ClientDataWrapper(std::shared_ptr<rmw_zenoh_cpp::ClientData> data)
: client_data(std::move(data))
{
}

std::shared_ptr<rmw_zenoh_cpp::ClientData> client_data;
};

///=============================================================================
void client_data_handler(z_loaned_reply_t * reply, void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
auto wrapper = static_cast<ClientDataWrapper *>(data);
if (wrapper == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t from data in client_data_handler."
);
return;
}

if (client_data->is_shutdown()) {
if (wrapper->client_data->is_shutdown()) {
return;
}

Expand All @@ -69,7 +79,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False for keyexpr %s. Reason: %.*s",
client_data->topic_info().topic_keyexpr_.c_str(),
wrapper->client_data->topic_info().topic_keyexpr_.c_str(),
static_cast<int>(z_string_len(z_loan(err_str))),
z_string_data(z_loan(err_str)));
z_drop(z_move(err_str));
Expand All @@ -80,23 +90,23 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

client_data->add_new_reply(
wrapper->client_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
}

///=============================================================================
void client_data_drop(void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
auto wrapper = static_cast<ClientDataWrapper *>(data);
if (wrapper == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t from data in client_data_drop."
);
return;
}

client_data->decrement_in_flight_and_conditionally_remove();
delete wrapper;
}

} // namespace
Expand Down Expand Up @@ -228,8 +238,7 @@ ClientData::ClientData(
wait_set_data_(nullptr),
sequence_number_(1),
is_shutdown_(false),
initialized_(false),
num_in_flight_(0)
initialized_(false)
{
// Do nothing.
}
Expand Down Expand Up @@ -470,9 +479,9 @@ rmw_ret_t ClientData::send_request(

// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
num_in_flight_++;
ClientDataWrapper * wrapper = new ClientDataWrapper(shared_from_this());
z_owned_closure_reply_t zn_closure_reply;
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this);
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, wrapper);
z_get(
sess_->loan(),
z_loan(keyexpr_), "",
Expand Down Expand Up @@ -527,10 +536,11 @@ bool ClientData::detach_condition_and_queue_is_empty()
}

///=============================================================================
void ClientData::_shutdown()
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return;
return RMW_RET_OK;
}

// Unregister this node from the ROS graph.
Expand All @@ -541,45 +551,10 @@ void ClientData::_shutdown()

sess_.reset();
is_shutdown_ = true;
}

///=============================================================================
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return RMW_RET_OK;
}

///=============================================================================
bool ClientData::shutdown_and_query_in_flight()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return num_in_flight_ > 0;
}

///=============================================================================
void ClientData::decrement_in_flight_and_conditionally_remove()
{
std::unique_lock<std::recursive_mutex> lock(mutex_);
--num_in_flight_;

if (is_shutdown_ && num_in_flight_ == 0) {
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(rmw_node_->data);
if (context_impl == nullptr) {
return;
}
std::shared_ptr<rmw_zenoh_cpp::NodeData> node_data = context_impl->get_node_data(rmw_node_);
if (node_data == nullptr) {
return;
}
// We have to unlock here since we are about to delete ourself, and thus the unlock would be UB.
lock.unlock();
node_data->delete_client_data(rmw_client_);
}
}

///=============================================================================
bool ClientData::is_shutdown() const
{
Expand Down
10 changes: 0 additions & 10 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Shutdown this ClientData.
rmw_ret_t shutdown();

// Shutdown this ClientData, and return whether there are any requests currently in flight.
bool shutdown_and_query_in_flight();

// Decrement the in flight requests, and if that drops to 0 remove the client from the node.
void decrement_in_flight_and_conditionally_remove();

// Check if this ClientData is shutdown.
bool is_shutdown() const;

Expand All @@ -122,9 +116,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Initialize the Zenoh objects for this entity.
bool init(std::shared_ptr<ZenohSession> session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();

// Internal mutex.
mutable std::recursive_mutex mutex_;
// The parent node.
Expand Down Expand Up @@ -155,7 +146,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
bool is_shutdown_;
// Whether the object has ever successfully been initialized.
bool initialized_;
size_t num_in_flight_;
};
using ClientDataPtr = std::shared_ptr<ClientData>;
using ClientDataConstPtr = std::shared_ptr<const ClientData>;
Expand Down
8 changes: 1 addition & 7 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,7 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client)
void NodeData::delete_client_data(const rmw_client_t * const client)
{
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto client_it = clients_.find(client);
if (client_it == clients_.end()) {
return;
}
if (!client_it->second->shutdown_and_query_in_flight()) {
clients_.erase(client);
}
clients_.erase(client);
}

///=============================================================================
Expand Down

0 comments on commit ca5058c

Please sign in to comment.