Skip to content

Commit

Permalink
refactor: wrap the zenoh session with a shared pointer (#333)
Browse files Browse the repository at this point in the history
* refactor: wrap the zenoh session with a shared pointer

* fixup! refactor: wrap the zenoh session with a shared pointer

* chore: remove the automatically added headers

* fix: apply the suggested change

* fix: hold the session at the right place
  • Loading branch information
YuanYuYuan authored Dec 10, 2024
1 parent 7fcd074 commit 8bca9d7
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 55 deletions.
15 changes: 10 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ClientData> ClientData::make(
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -167,7 +167,7 @@ std::shared_ptr<ClientData> ClientData::make(

std::size_t domain_id = node_info.domain_id_;
auto entity = liveliness::Entity::make(
z_info_zid(session),
z_info_zid(session->loan()),
std::to_string(node_id),
std::to_string(service_id),
liveliness::EntityType::Client,
Expand All @@ -192,6 +192,7 @@ std::shared_ptr<ClientData> ClientData::make(
node,
client,
entity,
session,
request_members,
response_members,
request_type_support,
Expand All @@ -211,13 +212,15 @@ ClientData::ClientData(
const rmw_node_t * rmw_node,
const rmw_client_t * rmw_client,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<ZenohSession> sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
rmw_client_(rmw_client),
entity_(std::move(entity)),
sess_(std::move(sess)),
request_type_support_impl_(request_type_support_impl),
response_type_support_impl_(response_type_support_impl),
request_type_support_(request_type_support),
Expand All @@ -232,7 +235,7 @@ ClientData::ClientData(
}

///=============================================================================
bool ClientData::init(const z_loaned_session_t * session)
bool ClientData::init(std::shared_ptr<ZenohSession> session)
{
if (z_keyexpr_from_str(
&this->keyexpr_,
Expand All @@ -250,7 +253,7 @@ bool ClientData::init(const z_loaned_session_t * session)
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
if (z_liveliness_declare_token(
session,
session->loan(),
&this->token_,
z_loan(liveliness_ke),
NULL
Expand All @@ -266,6 +269,7 @@ bool ClientData::init(const z_loaned_session_t * session)
z_drop(z_move(this->token_));
});

sess_ = session;
initialized_ = true;

free_ros_keyexpr.cancel();
Expand Down Expand Up @@ -470,7 +474,7 @@ rmw_ret_t ClientData::send_request(
z_owned_closure_reply_t zn_closure_reply;
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this);
z_get(
context_impl->session(),
sess_->loan(),
z_loan(keyexpr_), "",
z_move(zn_closure_reply),
&opts);
Expand Down Expand Up @@ -535,6 +539,7 @@ void ClientData::_shutdown()
z_drop(z_move(keyexpr_));
}

sess_.reset();
is_shutdown_ = true;
}

Expand Down
7 changes: 5 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
public:
// Make a shared_ptr of ClientData.
static std::shared_ptr<ClientData> make(
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -113,13 +113,14 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const rmw_node_t * rmw_node,
const rmw_client_t * client,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<ZenohSession> sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Initialize the Zenoh objects for this entity.
bool init(const z_loaned_session_t * session);
bool init(std::shared_ptr<ZenohSession> session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();
Expand All @@ -131,6 +132,8 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const rmw_client_t * rmw_client_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;
// A shared session.
std::shared_ptr<ZenohSession> sess_;
// An owned keyexpression.
z_owned_keyexpr_t keyexpr_;
// Liveliness token for the service.
Expand Down
43 changes: 23 additions & 20 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include "rcpputils/scope_exit.hpp"
#include "rmw/error_handling.h"
#include "zenoh_utils.hpp"

// Megabytes of SHM to reserve.
// TODO(clalancette): Make this configurable, or get it from the configuration
Expand Down Expand Up @@ -86,13 +87,18 @@ class rmw_context_impl_s::Data final
});

// Initialize the zenoh session.
if (z_open(&session_, z_move(config), NULL) != Z_OK) {
z_owned_session_t raw_session;
if (z_open(&raw_session, z_move(config), NULL) != Z_OK) {
RMW_SET_ERROR_MSG("Error setting up zenoh session.");
throw std::runtime_error("Error setting up zenoh session.");
}
if (session_ != nullptr) {
session_.reset();
}
session_ = std::make_shared<rmw_zenoh_cpp::ZenohSession>(raw_session);
auto close_session = rcpputils::make_scope_exit(
[this]() {
z_close(z_loan_mut(session_), NULL);
[&raw_session]() {
z_close(z_loan_mut(raw_session), NULL);
});

// Verify if the zenoh router is running if configured.
Expand All @@ -102,7 +108,7 @@ class rmw_context_impl_s::Data final
uint64_t connection_attempts = 0;
constexpr std::chrono::milliseconds sleep_time(1000);
constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time);
while ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) {
while ((ret = rmw_zenoh_cpp::zenoh_router_check(session_->loan())) != RMW_RET_OK) {
if ((connection_attempts % ticks_between_print) == 0) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -117,7 +123,7 @@ class rmw_context_impl_s::Data final
}

// Initialize the graph cache.
const z_id_t zid = z_info_zid(z_loan(session_));
const z_id_t zid = z_info_zid(session_->loan());
graph_cache_ = std::make_shared<rmw_zenoh_cpp::GraphCache>(zid);
// Setup liveliness subscriptions for discovery.
std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id);
Expand All @@ -144,7 +150,7 @@ class rmw_context_impl_s::Data final
z_view_keyexpr_t keyexpr;
z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str());
z_liveliness_get(
z_loan(session_), z_loan(keyexpr),
session_->loan(), z_loan(keyexpr),
z_move(closure), NULL);
z_owned_reply_t reply;
while (z_recv(z_loan(handler), &reply) == Z_OK) {
Expand Down Expand Up @@ -203,7 +209,7 @@ class rmw_context_impl_s::Data final
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str());
if (z_liveliness_declare_subscriber(
z_loan(session_),
session_->loan(),
&graph_subscriber_, z_loan(liveliness_ke),
z_move(callback), &sub_options) != Z_OK)
{
Expand Down Expand Up @@ -240,11 +246,8 @@ class rmw_context_impl_s::Data final
// to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler().
}

// Close the zenoh session
if (z_close(z_loan_mut(session_), NULL) != Z_OK) {
RMW_SET_ERROR_MSG("Error while closing zenoh session");
return RMW_RET_ERROR;
}
// Drop the shared session.
session_.reset();

return RMW_RET_OK;
}
Expand All @@ -255,10 +258,10 @@ class rmw_context_impl_s::Data final
return enclave_;
}

const z_loaned_session_t * session() const
std::shared_ptr<rmw_zenoh_cpp::ZenohSession> session() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return z_loan(session_);
return session_;
}

std::optional<z_owned_shm_provider_t> & shm_provider()
Expand Down Expand Up @@ -288,7 +291,7 @@ class rmw_context_impl_s::Data final
bool session_is_valid() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return !z_session_is_closed(z_loan(session_));
return !z_session_is_closed(session_->loan());
}

std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache()
Expand All @@ -309,7 +312,7 @@ class rmw_context_impl_s::Data final
}

// Check that the Zenoh session is still valid.
if (z_session_is_closed(z_loan(session_))) {
if (z_session_is_closed(session_->loan())) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create NodeData as Zenoh session is invalid.");
Expand All @@ -319,7 +322,7 @@ class rmw_context_impl_s::Data final
auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
z_loan(session_),
session_->loan(),
domain_id_,
ns,
node_name,
Expand Down Expand Up @@ -395,8 +398,8 @@ class rmw_context_impl_s::Data final
std::size_t domain_id_;
// Enclave, name used to find security artifacts in a sros2 keystore.
std::string enclave_;
// An owned session.
z_owned_session_t session_;
// A shared session.
std::shared_ptr<rmw_zenoh_cpp::ZenohSession> session_{nullptr};
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<z_owned_shm_provider_t> shm_provider_;
Expand Down Expand Up @@ -472,7 +475,7 @@ std::string rmw_context_impl_s::enclave() const
}

///=============================================================================
const z_loaned_session_t * rmw_context_impl_s::session() const
std::shared_ptr<rmw_zenoh_cpp::ZenohSession> rmw_context_impl_s::session() const
{
return data_->session();
}
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class rmw_context_impl_s final
// Get a copy of the enclave.
std::string enclave() const;

// Loan the Zenoh session.
const z_loaned_session_t * session() const;
// Share the Zenoh session.
std::shared_ptr<rmw_zenoh_cpp::ZenohSession> session() const;

// Get a reference to the shm_provider.
// Note: This is not thread-safe.
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ std::size_t NodeData::id() const
///=============================================================================
bool NodeData::create_pub_data(
const rmw_publisher_t * const publisher,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
Expand Down Expand Up @@ -187,7 +187,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
///=============================================================================
bool NodeData::create_sub_data(
const rmw_subscription_t * const subscription,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::shared_ptr<GraphCache> graph_cache,
std::size_t id,
const std::string & topic_name,
Expand Down Expand Up @@ -255,7 +255,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription)
///=============================================================================
bool NodeData::create_service_data(
const rmw_service_t * const service,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
Expand Down Expand Up @@ -322,7 +322,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service)
///=============================================================================
bool NodeData::create_client_data(
const rmw_client_t * const client,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class NodeData final
// Create a new PublisherData for a given rmw_publisher_t.
bool create_pub_data(
const rmw_publisher_t * const publisher,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> sess,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
Expand All @@ -70,7 +70,7 @@ class NodeData final
// Create a new SubscriptionData for a given rmw_subscription_t.
bool create_sub_data(
const rmw_subscription_t * const subscription,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> sess,
std::shared_ptr<GraphCache> graph_cache,
std::size_t id,
const std::string & topic_name,
Expand All @@ -86,7 +86,7 @@ class NodeData final
// Create a new ServiceData for a given rmw_service_t.
bool create_service_data(
const rmw_service_t * const service,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
Expand All @@ -101,7 +101,7 @@ class NodeData final
// Create a new ClientData for a given rmw_client_t.
bool create_client_data(
const rmw_client_t * const client,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
Expand Down
Loading

0 comments on commit 8bca9d7

Please sign in to comment.