Skip to content

Commit

Permalink
tokens
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Nov 19, 2024
1 parent 4c102b0 commit 0635799
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 90 deletions.
50 changes: 20 additions & 30 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ std::shared_ptr<ClientData> ClientData::make(
response_type_support
});

if (!client_data->init(z_loan(session->_0))) {
if (!client_data->init(session)) {
// init() already set the error.
return nullptr;
}
Expand Down Expand Up @@ -226,41 +226,25 @@ ClientData::ClientData(
}

///=============================================================================
bool ClientData::init(const z_loaned_session_t * session)
bool ClientData::init(const std::shared_ptr<zenoh::Session> & session)
{
std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(this->keyexpr_));
});
if (z_keyexpr_from_str(&this->keyexpr_, topic_keyexpr.c_str()) != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return false;
}
keyexpr_ = zenoh::KeyExpr(topic_keyexpr);

std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
auto free_token = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(this->token_));
});
if(zc_liveliness_declare_token(
session,
&this->token_,
z_loan(liveliness_ke),
NULL
) != Z_OK)
zenoh::ZResult err;
this->token_ = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the client.");
return false;
}

free_ros_keyexpr.cancel();
free_token.cancel();

return true;
}

Expand Down Expand Up @@ -289,7 +273,7 @@ void ClientData::add_new_reply(std::unique_ptr<ZenohReply> reply)
{
// Log warning if message is discarded due to hitting the queue depth
z_view_string_t keystr;
z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr);
z_keyexpr_as_view_string(z_loan(keyexpr_.value()._0), &keystr);
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Query queue depth of %ld reached, discarding oldest Query "
Expand Down Expand Up @@ -451,7 +435,7 @@ rmw_ret_t ClientData::send_request(
z_closure(&callback, client_data_handler, client_data_drop, this);
z_get(
context_impl->session(),
z_loan(keyexpr_), "",
z_loan(keyexpr_.value()._0), "",
z_move(callback),
&opts);

Expand Down Expand Up @@ -510,9 +494,15 @@ void ClientData::_shutdown()
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));

z_drop(z_move(keyexpr_));
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return;
}

is_shutdown_ = true;
}
Expand Down
6 changes: 3 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Initialize the Zenoh objects for this entity.
bool init(const z_loaned_session_t * session);
bool init(const std::shared_ptr<zenoh::Session> & session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();
Expand All @@ -122,9 +122,9 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;
// An owned keyexpression.
z_owned_keyexpr_t keyexpr_;
std::optional<zenoh::KeyExpr> keyexpr_;
// Liveliness token for the service.
zc_owned_liveliness_token_t token_;
std::optional<zenoh::LivelinessToken> token_;
// Type support fields.
const void * request_type_support_impl_;
const void * response_type_support_impl_;
Expand Down
28 changes: 17 additions & 11 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,18 @@ std::shared_ptr<NodeData> NodeData::make(

// Create the liveliness token.
std::string liveliness_keyexpr = entity->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
zc_owned_liveliness_token_t token;
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});
if (zc_liveliness_declare_token(z_loan(session->_0), &token, z_loan(liveliness_ke), NULL) != Z_OK) {
zenoh::ZResult err;
auto token = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the node.");
return nullptr;
}
free_token.cancel();

return std::shared_ptr<NodeData>(
new NodeData{
Expand All @@ -87,7 +85,7 @@ NodeData::NodeData(
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token)
zenoh::LivelinessToken token)
: node_(node),
id_(std::move(id)),
entity_(std::move(entity)),
Expand Down Expand Up @@ -452,7 +450,15 @@ rmw_ret_t NodeData::shutdown()
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}

is_shutdown_ = true;
return ret;
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class NodeData final
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token);
zenoh::LivelinessToken token);
// Internal mutex.
mutable std::mutex mutex_;
// The rmw_node_t associated with this NodeData.
Expand All @@ -137,7 +137,7 @@ class NodeData final
// The Entity generated for the node.
std::shared_ptr<liveliness::Entity> entity_;
// Liveliness token for the node.
zc_owned_liveliness_token_t token_;
std::optional<zenoh::LivelinessToken> token_;
// Shutdown flag.
bool is_shutdown_;
// Map of publishers.
Expand Down
29 changes: 16 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,24 +166,19 @@ std::shared_ptr<PublisherData> PublisherData::make(
}

std::string liveliness_keyexpr = entity->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
zc_owned_liveliness_token_t token;
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});
if (zc_liveliness_declare_token(
z_loan(session->_0), &token, z_loan(liveliness_ke),
NULL) != Z_OK)
zenoh::ZResult err;
auto token = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the publisher.");
return nullptr;
}

free_token.cancel();
undeclare_z_publisher_cache.cancel();
undeclare_z_publisher.cancel();

Expand All @@ -205,7 +200,7 @@ PublisherData::PublisherData(
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
Expand Down Expand Up @@ -428,7 +423,15 @@ rmw_ret_t PublisherData::shutdown()
}

// Unregister this publisher from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}
if (pub_cache_.has_value()) {
z_drop(z_move(pub_cache_.value()));
}
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class PublisherData final
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);

Expand All @@ -100,7 +100,7 @@ class PublisherData final
// Optional publication cache when durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache_;
// Liveliness token for the publisher.
zc_owned_liveliness_token_t token_;
std::optional<zenoh::LivelinessToken> token_;
// Type support fields
const void * type_support_impl_;
std::unique_ptr<MessageTypeSupport> type_support_;
Expand Down
31 changes: 15 additions & 16 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,20 +178,12 @@ std::shared_ptr<ServiceData> ServiceData::make(
}

std::string liveliness_keyexpr = service_data->entity_->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
if (z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()) != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return nullptr;
}
auto free_token = rcpputils::make_scope_exit(
[service_data]() {
if (service_data != nullptr) {
z_drop(z_move(service_data->token_));
}
});
if (zc_liveliness_declare_token(
z_loan(session->_0), &service_data->token_, z_loan(liveliness_ke),
NULL) != Z_OK)
zenoh::ZResult err;
service_data->token_ = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -200,7 +192,6 @@ std::shared_ptr<ServiceData> ServiceData::make(
}

undeclare_z_queryable.cancel();
free_token.cancel();

return service_data;
}
Expand Down Expand Up @@ -492,7 +483,15 @@ rmw_ret_t ServiceData::shutdown()
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}
z_undeclare_queryable(z_move(qable_));

is_shutdown_ = true;
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_service_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ServiceData final
// An owned queryable.
z_owned_queryable_t qable_;
// Liveliness token for the service.
zc_owned_liveliness_token_t token_;
std::optional<zenoh::LivelinessToken> token_;
// Type support fields.
const void * request_type_support_impl_;
const void * response_type_support_impl_;
Expand Down
26 changes: 15 additions & 11 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,12 @@ bool SubscriptionData::init()

// Publish to the graph that a new subscription is in town.
std::string liveliness_keyexpr = entity_->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());

auto free_token = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(token_));
});
if (zc_liveliness_declare_token(
context_impl->session(), &token_, z_loan(liveliness_ke), NULL) != Z_OK)
zenoh::ZResult err;
token_ = context_impl->session_cpp()->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -337,7 +334,6 @@ bool SubscriptionData::init()
}

undeclare_z_sub.cancel();
free_token.cancel();

initialized_ = true;

Expand Down Expand Up @@ -396,7 +392,15 @@ rmw_ret_t SubscriptionData::shutdown()
graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash());

// Unregister this subscription from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}

z_owned_subscriber_t * sub = std::get_if<z_owned_subscriber_t>(&sub_);
if (sub != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
// An owned subscriber or querying_subscriber depending on the QoS settings.
std::variant<z_owned_subscriber_t, ze_owned_querying_subscriber_t> sub_;
// Liveliness token for the subscription.
zc_owned_liveliness_token_t token_;
std::optional<zenoh::LivelinessToken> token_;
// Type support fields
const void * type_support_impl_;
std::unique_ptr<MessageTypeSupport> type_support_;
Expand Down

0 comments on commit 0635799

Please sign in to comment.