Skip to content

Commit

Permalink
Use weak pointers in rmw_context_impl_s.
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Dec 17, 2024
1 parent 271da8d commit 3f475d7
Showing 1 changed file with 22 additions and 37 deletions.
59 changes: 22 additions & 37 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,10 @@
// TODO(clalancette): Make this configurable, or get it from the configuration
#define SHM_BUFFER_SIZE_MB 10

// This global mapping of raw Data pointers to Data shared pointers allows graph_sub_data_handler()
// to lookup the pointer, and gain a reference to a shared_ptr if it exists.
// This guarantees that the Data object will not be destroyed while we are using it.
static std::mutex data_to_data_shared_ptr_map_mutex;
static std::unordered_map<rmw_context_impl_s::Data *,
std::shared_ptr<rmw_context_impl_s::Data>> data_to_data_shared_ptr_map;

// Bundle all class members into a data struct which can be passed as a
// weak ptr to various threads for thread-safe access without capturing
// "this" ptr by reference.
class rmw_context_impl_s::Data final
class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
{
public:
// Constructor.
Expand All @@ -64,7 +57,8 @@ class rmw_context_impl_s::Data final
enclave_(std::move(enclave)),
is_shutdown_(false),
next_entity_id_(0),
nodes_({})
nodes_({}),
liveliness_keyexpr_(rmw_zenoh_cpp::liveliness::subscription_token(domain_id))
{
// Initialize the zenoh configuration.
std::optional<zenoh::Config> config = rmw_zenoh_cpp::get_z_config(
Expand Down Expand Up @@ -127,8 +121,6 @@ class rmw_context_impl_s::Data final
graph_cache_ =
std::make_shared<rmw_zenoh_cpp::GraphCache>(this->session_->get_zid());
// Setup liveliness subscriptions for discovery.
std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id);

// Query router/liveliness participants to get graph information before the session was started.
// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
// replies for the z_liveliness_get() call. This is necessary as if the `bound`
Expand All @@ -147,14 +139,12 @@ class rmw_context_impl_s::Data final

// Intuitively use a FIFO channel rather than building it up with a closure and a
// handler to FIFO channel
zenoh::KeyExpr keyexpr(liveliness_str);

zenoh::Session::GetOptions options = zenoh::Session::GetOptions::create_default();
options.target = zenoh::QueryTarget::Z_QUERY_TARGET_ALL;
options.payload = "";

auto replies = session_->liveliness_get(
keyexpr,
liveliness_keyexpr_,
zenoh::channels::FifoChannel(SIZE_MAX - 1),
zenoh::Session::LivelinessGetOptions::create_default(),
&result);
Expand Down Expand Up @@ -193,29 +183,29 @@ class rmw_context_impl_s::Data final
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
graph_guard_condition_->data = &guard_condition_data_;
}

void init()
{
// Setup the liveliness subscriber to receives updates from the ROS graph
// and update the graph cache.
zenoh::KeyExpr keyexpr_cpp(liveliness_str.c_str());
zenoh::Session::LivelinessSubscriberOptions sub_options =
zenoh::Session::LivelinessSubscriberOptions::create_default();
sub_options.history = true;
graph_subscriber_ = session_->liveliness_declare_subscriber(
keyexpr_cpp,
[&](const zenoh::Sample & sample) {
// Look up the data shared_ptr in the global map. If it is in there, use it.
// If not, it is being shutdown so we can just ignore this update.
std::shared_ptr<rmw_context_impl_s::Data> data_shared_ptr{nullptr};
{
std::lock_guard<std::mutex> lk(data_to_data_shared_ptr_map_mutex);
if (data_to_data_shared_ptr_map.count(this) == 0) {
return;
}
data_shared_ptr = data_to_data_shared_ptr_map[this];
}

// This can't be in the constructor since shared_from_this() doesn't work there.
std::weak_ptr<Data> context_impl_data_wp = shared_from_this();
zenoh::ZResult result;
graph_subscriber_ = session_->liveliness_declare_subscriber(
liveliness_keyexpr_,
[context_impl_data_wp](const zenoh::Sample & sample) {
// Update the graph cache.
data_shared_ptr->update_graph_cache(
std::shared_ptr<Data> context_impl_data = context_impl_data_wp.lock();
if (context_impl_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to obtain context_impl.");
return;
}
context_impl_data->update_graph_cache(
sample,
std::string(sample.get_keyexpr().as_string_view()));
},
Expand Down Expand Up @@ -435,6 +425,8 @@ class rmw_context_impl_s::Data final
std::size_t next_entity_id_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;

zenoh::KeyExpr liveliness_keyexpr_;
};

///=============================================================================
Expand All @@ -443,9 +435,7 @@ rmw_context_impl_s::rmw_context_impl_s(
const std::string & enclave)
{
data_ = std::make_shared<Data>(domain_id, std::move(enclave));

std::lock_guard<std::mutex> lk(data_to_data_shared_ptr_map_mutex);
data_to_data_shared_ptr_map.emplace(data_.get(), data_);
data_->init();
}

///=============================================================================
Expand Down Expand Up @@ -487,11 +477,6 @@ std::size_t rmw_context_impl_s::get_next_entity_id()
///=============================================================================
rmw_ret_t rmw_context_impl_s::shutdown()
{
{
std::lock_guard<std::mutex> lk(data_to_data_shared_ptr_map_mutex);
data_to_data_shared_ptr_map.erase(data_.get());
}

return data_->shutdown();
}

Expand Down

0 comments on commit 3f475d7

Please sign in to comment.