Skip to content

Commit

Permalink
Merge remote-tracking branch 'zenoh/dev/1.0.0' into ahcorde/rolling/1…
Browse files Browse the repository at this point in the history
….0.0_windows
  • Loading branch information
ahcorde committed Dec 5, 2024
2 parents 4982ae2 + 84d1267 commit 924a3b1
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 85 deletions.
3 changes: 1 addition & 2 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment)
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
z_owned_string_t key;

ze_deserializer_deserialize_string(&deserializer, &key);

// Deserialize the sequence_number
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "sequence_number")
Expand Down
17 changes: 6 additions & 11 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,23 +236,19 @@ ClientData::ClientData(
bool ClientData::init(const z_loaned_session_t * session)
{
std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(this->keyexpr_));
});
if (z_keyexpr_from_str(&this->keyexpr_, topic_keyexpr.c_str()) != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return false;
}
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(this->keyexpr_));
});

std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
auto free_token = rcpputils::make_scope_exit(
[this]() {
z_drop(z_move(this->token_));
});
if (zc_liveliness_declare_token(
if (z_liveliness_declare_token(
session,
&this->token_,
z_loan(liveliness_ke),
Expand All @@ -266,7 +262,6 @@ bool ClientData::init(const z_loaned_session_t * session)
}

free_ros_keyexpr.cancel();
free_token.cancel();

return true;
}
Expand Down Expand Up @@ -518,7 +513,7 @@ void ClientData::_shutdown()
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
z_liveliness_undeclare_token(z_move(token_));

z_drop(z_move(keyexpr_));

Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// An owned keyexpression.
z_owned_keyexpr_t keyexpr_;
// Liveliness token for the service.
zc_owned_liveliness_token_t token_;
z_owned_liveliness_token_t token_;
// Type support fields.
const void * request_type_support_impl_;
const void * response_type_support_impl_;
Expand Down
24 changes: 10 additions & 14 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class rmw_context_impl_s::Data final

// Query router/liveliness participants to get graph information before the session was started.
// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
// replies for the zc_liveliness_get() call. This is necessary as if the `bound`
// replies for the z_liveliness_get() call. This is necessary as if the `bound`
// is too low, the channel may starve the zenoh executor of its threads which
// would lead to deadlocks when trying to receive replies and block the
// execution here.
Expand All @@ -143,7 +143,7 @@ class rmw_context_impl_s::Data final

z_view_keyexpr_t keyexpr;
z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str());
zc_liveliness_get(
z_liveliness_get(
z_loan(session_), z_loan(keyexpr),
z_move(closure), NULL);
z_owned_reply_t reply;
Expand All @@ -152,10 +152,10 @@ class rmw_context_impl_s::Data final
const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply));
z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
std::string livelines_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)));
std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)));
// Ignore tokens from the same session to avoid race conditions from this
// query and the liveliness subscription.
graph_cache_->parse_put(std::move(livelines_str), true);
graph_cache_->parse_put(std::move(liveliness_str), true);
} else {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n");
Expand Down Expand Up @@ -196,17 +196,13 @@ class rmw_context_impl_s::Data final

// Setup the liveliness subscriber to receives updates from the ROS graph
// and update the graph cache.
zc_liveliness_subscriber_options_t sub_options;
zc_liveliness_subscriber_options_default(&sub_options);
z_liveliness_subscriber_options_t sub_options;
z_liveliness_subscriber_options_default(&sub_options);
z_owned_closure_sample_t callback;
z_closure(&callback, graph_sub_data_handler, nullptr, this);
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str());
auto undeclare_z_sub = rcpputils::make_scope_exit(
[this]() {
z_undeclare_subscriber(z_move(this->graph_subscriber_));
});
if (zc_liveliness_declare_subscriber(
if (z_liveliness_declare_subscriber(
z_loan(session_),
&graph_subscriber_, z_loan(liveliness_ke),
z_move(callback), &sub_options) != Z_OK)
Expand All @@ -217,7 +213,6 @@ class rmw_context_impl_s::Data final

close_session.cancel();
// free_shm_provider.cancel();
undeclare_z_sub.cancel();
}

// Shutdown the Zenoh session.
Expand Down Expand Up @@ -245,6 +240,7 @@ class rmw_context_impl_s::Data final
RMW_SET_ERROR_MSG("Error while closing zenoh session");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

Expand Down Expand Up @@ -443,8 +439,8 @@ static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data)
}

// Update the graph cache.
std::string livelines_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)));
data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(livelines_str));
std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)));
data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(liveliness_str));
}

///=============================================================================
Expand Down
13 changes: 4 additions & 9 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,13 @@ std::shared_ptr<NodeData> NodeData::make(
std::string liveliness_keyexpr = entity->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
zc_owned_liveliness_token_t token;
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});
if (zc_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) {
z_owned_liveliness_token_t token;
if (z_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the node.");
return nullptr;
}
free_token.cancel();

return std::shared_ptr<NodeData>(
new NodeData{
Expand All @@ -87,7 +82,7 @@ NodeData::NodeData(
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token)
z_owned_liveliness_token_t token)
: node_(node),
id_(std::move(id)),
entity_(std::move(entity)),
Expand Down Expand Up @@ -402,7 +397,7 @@ rmw_ret_t NodeData::shutdown()
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
z_liveliness_undeclare_token(z_move(token_));

is_shutdown_ = true;
return ret;
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class NodeData final
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token);
z_owned_liveliness_token_t token);
// Internal mutex.
mutable std::recursive_mutex mutex_;
// The rmw_node_t associated with this NodeData.
Expand All @@ -139,7 +139,7 @@ class NodeData final
// The Entity generated for the node.
std::shared_ptr<liveliness::Entity> entity_;
// Liveliness token for the node.
zc_owned_liveliness_token_t token_;
z_owned_liveliness_token_t token_;
// Shutdown flag.
bool is_shutdown_;
// Map of publishers.
Expand Down
33 changes: 14 additions & 19 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ std::shared_ptr<PublisherData> PublisherData::make(

// Create a Publication Cache if durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache = std::nullopt;
auto undeclare_z_publisher_cache = rcpputils::make_scope_exit(
[&pub_cache]() {
if (pub_cache.has_value()) {
z_drop(z_move(pub_cache.value()));
}
});
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_publication_cache_options_t pub_cache_opts;
ze_publication_cache_options_default(&pub_cache_opts);
Expand All @@ -142,6 +136,12 @@ std::shared_ptr<PublisherData> PublisherData::make(
}
pub_cache = pub_cache_;
}
auto undeclare_z_publisher_cache = rcpputils::make_scope_exit(
[&pub_cache]() {
if (pub_cache.has_value()) {
z_drop(z_move(pub_cache.value()));
}
});

// Set congestion_control to BLOCK if appropriate.
z_publisher_options_t opts;
Expand All @@ -157,26 +157,22 @@ std::shared_ptr<PublisherData> PublisherData::make(
}
z_owned_publisher_t pub;
// TODO(clalancette): What happens if the key name is a valid but empty string?
auto undeclare_z_publisher = rcpputils::make_scope_exit(
[&pub]() {
z_undeclare_publisher(z_move(pub));
});
if (z_declare_publisher(
session, &pub, z_loan(pub_ke), &opts) != Z_OK)
{
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
return nullptr;
}
auto undeclare_z_publisher = rcpputils::make_scope_exit(
[&pub]() {
z_undeclare_publisher(z_move(pub));
});

std::string liveliness_keyexpr = entity->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
zc_owned_liveliness_token_t token;
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});
if (zc_liveliness_declare_token(
z_owned_liveliness_token_t token;
if (z_liveliness_declare_token(
session, &token, z_loan(liveliness_ke),
NULL) != Z_OK)
{
Expand All @@ -186,7 +182,6 @@ std::shared_ptr<PublisherData> PublisherData::make(
return nullptr;
}

free_token.cancel();
undeclare_z_publisher_cache.cancel();
undeclare_z_publisher.cancel();

Expand All @@ -208,7 +203,7 @@ PublisherData::PublisherData(
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
z_owned_liveliness_token_t token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
Expand Down Expand Up @@ -431,7 +426,7 @@ rmw_ret_t PublisherData::shutdown()
}

// Unregister this publisher from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
z_liveliness_undeclare_token(z_move(token_));
if (pub_cache_.has_value()) {
z_drop(z_move(pub_cache_.value()));
}
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class PublisherData final
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
z_owned_liveliness_token_t token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);

Expand All @@ -104,7 +104,7 @@ class PublisherData final
// Optional publication cache when durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache_;
// Liveliness token for the publisher.
zc_owned_liveliness_token_t token_;
z_owned_liveliness_token_t token_;
// Type support fields
const void * type_support_impl_;
std::unique_ptr<MessageTypeSupport> type_support_;
Expand Down
12 changes: 6 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,17 @@ std::shared_ptr<ServiceData> ServiceData::make(
z_queryable_options_t qable_options;
z_queryable_options_default(&qable_options);
qable_options.complete = true;
auto undeclare_z_queryable = rcpputils::make_scope_exit(
[service_data]() {
z_undeclare_queryable(z_move(service_data->qable_));
});
if (z_declare_queryable(
session, &service_data->qable_, z_loan(service_ke),
z_move(callback), &qable_options) != Z_OK)
{
RMW_SET_ERROR_MSG("unable to create zenoh queryable");
return nullptr;
}
auto undeclare_z_queryable = rcpputils::make_scope_exit(
[service_data]() {
z_undeclare_queryable(z_move(service_data->qable_));
});

std::string liveliness_keyexpr = service_data->entity_->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
Expand All @@ -195,7 +195,7 @@ std::shared_ptr<ServiceData> ServiceData::make(
z_drop(z_move(service_data->token_));
}
});
if (zc_liveliness_declare_token(
if (z_liveliness_declare_token(
session, &service_data->token_, z_loan(liveliness_ke),
NULL) != Z_OK)
{
Expand Down Expand Up @@ -495,7 +495,7 @@ rmw_ret_t ServiceData::shutdown()
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
z_liveliness_undeclare_token(z_move(token_));
z_undeclare_queryable(z_move(qable_));

is_shutdown_ = true;
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_service_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class ServiceData final
// An owned queryable.
z_owned_queryable_t qable_;
// Liveliness token for the service.
zc_owned_liveliness_token_t token_;
z_owned_liveliness_token_t token_;
// Type support fields.
const void * request_type_support_impl_;
const void * response_type_support_impl_;
Expand Down
Loading

0 comments on commit 924a3b1

Please sign in to comment.