From 8cd19bf4baf43a91056d997cdd1d33175faabc8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Hern=C3=A1ndez=20Cordero?= Date: Wed, 11 Dec 2024 12:08:38 +0100 Subject: [PATCH] Added feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Alejandro Hernández Cordero --- .../src/detail/attachment_helpers.cpp | 18 ++--- .../src/detail/attachment_helpers.hpp | 9 +-- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 4 +- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 3 +- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 34 +++++----- .../src/detail/rmw_context_impl_s.cpp | 36 ++++------ rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 12 ++-- .../src/detail/rmw_publisher_data.cpp | 40 +++++------ .../src/detail/rmw_publisher_data.hpp | 2 + rmw_zenoh_cpp/src/detail/rmw_service_data.cpp | 47 +++++++------ .../src/detail/rmw_subscription_data.cpp | 66 +++++++++---------- rmw_zenoh_cpp/src/detail/zenoh_config.cpp | 6 +- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 4 +- rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 2 +- .../src/rmw_get_topic_endpoint_info.cpp | 1 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 2 +- 16 files changed, 136 insertions(+), 150 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 37c7f335..56f7b49d 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -31,14 +31,14 @@ namespace rmw_zenoh_cpp { AttachmentData::AttachmentData( - const int64_t _sequence_number, - const int64_t _source_timestamp, - const std::vector _source_gid) + const int64_t sequence_number, + const int64_t source_timestamp, + const std::vector source_gid) +: sequence_number_(sequence_number), + source_timestamp_(source_timestamp), + source_gid_(source_gid), + gid_hash_(hash_gid(source_gid)) { - sequence_number_ = _sequence_number; - source_timestamp_ = _source_timestamp; - source_gid_ = _source_gid; - gid_hash_ = hash_gid(source_gid_); } AttachmentData::AttachmentData(AttachmentData && data) @@ -85,9 +85,9 @@ zenoh::Bytes AttachmentData::serialize_to_zbytes() return std::move(serializer).finish(); } -AttachmentData::AttachmentData(const zenoh::Bytes & attachment) +AttachmentData::AttachmentData(const zenoh::Bytes & bytes) { - zenoh::ext::Deserializer deserializer(std::move(attachment)); + zenoh::ext::Deserializer deserializer(std::move(bytes)); const auto sequence_number_str = deserializer.deserialize(); if (sequence_number_str != "sequence_number") { throw std::runtime_error("sequence_number is not found in the attachment."); diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index 7ae57709..7f9aa3ee 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -24,13 +24,14 @@ namespace rmw_zenoh_cpp { +///============================================================================= class AttachmentData final { public: - explicit AttachmentData( - const int64_t _sequence_number, - const int64_t _source_timestamp, - const std::vector _source_gid); + AttachmentData( + const int64_t sequence_number, + const int64_t source_timestamp, + const std::vector source_gid); explicit AttachmentData(const zenoh::Bytes & bytes); explicit AttachmentData(AttachmentData && data); diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index e0795ac8..99f99116 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -69,8 +69,8 @@ TopicData::TopicData(ConstEntityPtr entity) } ///============================================================================= -GraphCache::GraphCache(const std::string & zid) -: zid_str_(zid) +GraphCache::GraphCache(const zenoh::Id & zid) +: zid_str_(zid.to_string()) { // Do nothing. } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index e9eecb5d..6c4f6e54 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -36,6 +36,7 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/names_and_types.h" +#include namespace rmw_zenoh_cpp { @@ -110,7 +111,7 @@ class GraphCache final /// @param id The id of the zenoh session that is building the graph cache. /// This is used to infer which entities originated from the current session /// so that appropriate event callbacks may be triggered. - explicit GraphCache(const std::string & zid); + explicit GraphCache(const zenoh::Id & zid); // Parse a PUT message over a token's key-expression and update the graph. void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false); diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 8a153a93..bae0fe99 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -182,12 +182,12 @@ bool ClientData::init(const std::shared_ptr session) keyexpr_ = zenoh::KeyExpr(topic_keyexpr); // TODO(ahcorde) check KeyExpr std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr(); - zenoh::ZResult err; + zenoh::ZResult result; this->token_ = session->liveliness_declare_token( zenoh::KeyExpr(liveliness_keyexpr), zenoh::Session::LivelinessDeclarationOptions::create_default(), - &err); - if (err != Z_OK) { + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the client."); @@ -280,14 +280,10 @@ rmw_ret_t ClientData::take_response( const zenoh::Sample & sample = reply.get_ok(); // Object that manages the raw buffer - auto & payload = sample.get_payload(); - auto slice = payload.slice_iter().next(); - if (slice.has_value()) { - const uint8_t * payload = slice.value().data; - const size_t payload_len = slice.value().len; - + auto payload = sample.get_payload().as_vector(); + if (payload.size() > 0) { eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload)), payload_len); + reinterpret_cast(const_cast(payload.data())), payload.size()); // Object that serializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); @@ -401,10 +397,10 @@ rmw_ret_t ClientData::send_request( std::vector raw_bytes( reinterpret_cast(request_bytes), reinterpret_cast(request_bytes) + data_length); - opts.payload = zenoh::Bytes(raw_bytes); + opts.payload = zenoh::Bytes(std::move(raw_bytes)); std::weak_ptr client_data = shared_from_this(); - zenoh::ZResult err; + zenoh::ZResult result; std::string parameters; context_impl->session()->get( keyexpr_.value(), @@ -448,7 +444,13 @@ rmw_ret_t ClientData::send_request( } }, std::move(opts), - &err); + &result); + if (result != Z_OK) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ClientData unable to call get"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } @@ -506,9 +508,9 @@ rmw_ret_t ClientData::shutdown() // Unregister this node from the ROS graph. if (initialized_) { - zenoh::ZResult err; - std::move(token_).value().undeclare(&err); - if (err != Z_OK) { + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare liveliness token"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index da2e81b7..309720f7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -111,9 +111,9 @@ class rmw_context_impl_s::Data final "Unable to connect to a Zenoh router. " "Have you started a router with `ros2 run rmw_zenoh_cpp rmw_zenohd`?"); } - zenoh::ZResult err; - this->session_->get_routers_z_id(&err); - if (err != Z_OK) { + zenoh::ZResult result; + this->session_->get_routers_z_id(&result); + if (result != Z_OK) { ++connection_attempts; } else { ret = RMW_RET_OK; @@ -127,7 +127,7 @@ class rmw_context_impl_s::Data final // Initialize the graph cache. graph_cache_ = - std::make_shared(this->session_->get_zid().to_string()); + std::make_shared(this->session_->get_zid()); // Setup liveliness subscriptions for discovery. std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); @@ -155,14 +155,13 @@ class rmw_context_impl_s::Data final options.target = zenoh::QueryTarget::Z_QUERY_TARGET_ALL; options.payload = ""; - zenoh::ZResult err; auto replies = session_->liveliness_get( keyexpr, zenoh::channels::FifoChannel(SIZE_MAX - 1), zenoh::Session::LivelinessGetOptions::create_default(), - &err); + &result); - if (err != Z_OK) { + if (result != Z_OK) { throw std::runtime_error("Error getting liveliness. "); } @@ -223,9 +222,9 @@ class rmw_context_impl_s::Data final }, zenoh::closures::none, std::move(sub_options), - &err); + &result); - if (err != Z_OK) { + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); throw std::runtime_error("Unable to subscribe to ROS graph updates."); } @@ -241,22 +240,9 @@ class rmw_context_impl_s::Data final return ret; } - // TODO(ahcorde): review this - // // Shutdown all the nodes in this context. - // for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { - // ret = node_it->second->shutdown(); - // if (ret != RMW_RET_OK) { - // RMW_ZENOH_LOG_ERROR_NAMED( - // "rmw_zenoh_cpp", - // "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", - // node_it->second->id(), - // ret - // ); - // } - - zenoh::ZResult err; - std::move(graph_subscriber_cpp_).value().undeclare(&err); - if (err != Z_OK) { + zenoh::ZResult result; + std::move(graph_subscriber_cpp_).value().undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare liveliness token"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index cadafa43..ed1b5b2b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -58,12 +58,12 @@ std::shared_ptr NodeData::make( // Create the liveliness token. std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - zenoh::ZResult err; + zenoh::ZResult result; auto token = session->liveliness_declare_token( zenoh::KeyExpr(liveliness_keyexpr), zenoh::Session::LivelinessDeclarationOptions::create_default(), - &err); - if (err != Z_OK) { + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the node."); @@ -393,9 +393,9 @@ rmw_ret_t NodeData::shutdown() } // Unregister this node from the ROS graph. - zenoh::ZResult err; - std::move(token_).value().undeclare(&err); - if (err != Z_OK) { + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare liveliness token"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 2e9e7433..219b466d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -103,7 +103,7 @@ std::shared_ptr PublisherData::make( return nullptr; } - zenoh::ZResult err; + zenoh::ZResult result; std::optional pub_cache; zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); // Create a Publication Cache if durability is transient_local. @@ -117,9 +117,9 @@ std::shared_ptr PublisherData::make( std::string queryable_prefix = entity->zid(); pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix); - pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &err); + pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &result); - if (err != Z_OK) { + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; } @@ -135,9 +135,9 @@ std::shared_ptr PublisherData::make( opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } } - auto pub = session->declare_publisher(pub_ke, std::move(opts), &err); + auto pub = session->declare_publisher(pub_ke, std::move(opts), &result); - if (err != Z_OK) { + if (result != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; } @@ -146,8 +146,8 @@ std::shared_ptr PublisherData::make( auto token = session->liveliness_declare_token( zenoh::KeyExpr(liveliness_keyexpr), zenoh::Session::LivelinessDeclarationOptions::create_default(), - &err); - if (err != Z_OK) { + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the publisher."); @@ -273,7 +273,7 @@ rmw_ret_t PublisherData::publish( // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - zenoh::ZResult err; + zenoh::ZResult result; auto options = zenoh::Publisher::PutOptions::create_default(); options.attachment = create_map_and_set_sequence_num( sequence_number_++, @@ -285,9 +285,9 @@ rmw_ret_t PublisherData::publish( reinterpret_cast(msg_bytes) + data_length); zenoh::Bytes payload(raw_image); - pub_.put(std::move(payload), std::move(options), &err); - if (err != Z_OK) { - if (err == Z_ESESSION_CLOSED) { + pub_.put(std::move(payload), std::move(options), &result); + if (result != Z_OK) { + if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "unable to publish message since the zenoh session is closed"); @@ -319,7 +319,7 @@ rmw_ret_t PublisherData::publish_serialized_message( // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - zenoh::ZResult err; + zenoh::ZResult result; auto options = zenoh::Publisher::PutOptions::create_default(); options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid()); @@ -328,9 +328,9 @@ rmw_ret_t PublisherData::publish_serialized_message( serialized_message->buffer + data_length); zenoh::Bytes payload(raw_image); - pub_.put(std::move(payload), std::move(options), &err); - if (err != Z_OK) { - if (err == Z_ESESSION_CLOSED) { + pub_.put(std::move(payload), std::move(options), &result); + if (result != Z_OK) { + if (result == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "unable to publish message since the zenoh session is closed"); @@ -408,16 +408,16 @@ rmw_ret_t PublisherData::shutdown() } // Unregister this publisher from the ROS graph. - zenoh::ZResult err; - std::move(token_).value().undeclare(&err); - if (err != Z_OK) { + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare liveliness token"); return RMW_RET_ERROR; } - std::move(pub_).undeclare(&err); - if (err != Z_OK) { + std::move(pub_).undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare publisher"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 9940de93..cd8b2686 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -71,6 +71,8 @@ class PublisherData final // Copy the GID of this PublisherData into an rmw_gid_t. void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + + // Return a copy of the GID of this publisher. std::vector copy_gid() const; // Returns true if liveliness token is still valid. diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 19ad0dc7..5696e815 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -135,10 +135,10 @@ std::shared_ptr ServiceData::make( std::move(response_type_support) }); - zenoh::ZResult err; + zenoh::ZResult result; service_data->keyexpr_ = service_data->entity_->topic_info()->topic_keyexpr_; - zenoh::KeyExpr service_ke(service_data->keyexpr_, true, &err); - if (err != Z_OK) { + zenoh::KeyExpr service_ke(service_data->keyexpr_, true, &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } @@ -167,8 +167,8 @@ std::shared_ptr ServiceData::make( }, zenoh::closures::none, std::move(qable_options), - &err); - if (err != Z_OK) { + &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); return nullptr; } @@ -177,8 +177,8 @@ std::shared_ptr ServiceData::make( service_data->token_ = session->liveliness_declare_token( zenoh::KeyExpr(liveliness_keyexpr), zenoh::Session::LivelinessDeclarationOptions::create_default(), - &err); - if (err != Z_OK) { + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the service."); @@ -290,15 +290,12 @@ rmw_ret_t ServiceData::take_request( return RMW_RET_ERROR; } - auto slice = payload.value().get().slice_iter().next(); + auto payload_data = payload.value().get().as_vector(); - if (slice.has_value()) { - const uint8_t * payload = slice.value().data; - const size_t payload_len = slice.value().len; - - // Object that manages the raw buffer + if (payload_data.size() > 0) { + // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload)), payload_len); + reinterpret_cast(const_cast(payload_data.data())), payload_data.size()); // Object that serializes the data Cdr deser(fastbuffer); @@ -448,17 +445,17 @@ rmw_ret_t ServiceData::send_response( std::vector raw_bytes( reinterpret_cast(response_bytes), reinterpret_cast(response_bytes) + data_length); - zenoh::Bytes payload(raw_bytes); + zenoh::Bytes payload(std::move(raw_bytes)); - zenoh::ZResult err; - zenoh::KeyExpr service_ke(keyexpr_.c_str(), true, &err); - if (err != Z_OK) { + zenoh::ZResult result; + zenoh::KeyExpr service_ke(keyexpr_.c_str(), true, &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create KeyExpr"); return RMW_RET_ERROR; } - loaned_query.reply(service_ke, std::move(payload), std::move(options), &err); - if (err != Z_OK) { + loaned_query.reply(service_ke, std::move(payload), std::move(options), &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to reply"); return RMW_RET_ERROR; } @@ -521,17 +518,17 @@ rmw_ret_t ServiceData::shutdown() // Unregister this node from the ROS graph. if (initialized_) { - zenoh::ZResult err; - std::move(token_).value().undeclare(&err); - if (err != Z_OK) { + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare liveliness token"); return RMW_RET_ERROR; } - std::move(qable_).value().undeclare(&err); - if (err != Z_OK) { + std::move(qable_).value().undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare queryable"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 27295867..45ef1bf8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -169,9 +169,9 @@ bool SubscriptionData::init() "`reliability` no longer supported on subscriber. Ignoring..."); } - zenoh::ZResult err; - zenoh::KeyExpr sub_ke(entity_->topic_info()->topic_keyexpr_, true, &err); - if (err != Z_OK) { + zenoh::ZResult result; + zenoh::KeyExpr sub_ke(entity_->topic_info()->topic_keyexpr_, true, &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return false; } @@ -236,8 +236,8 @@ bool SubscriptionData::init() }, zenoh::closures::none, std::move(sub_options), - &err); - if (err != Z_OK) { + &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } @@ -273,13 +273,13 @@ bool SubscriptionData::init() opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; - zenoh::ZResult err; + zenoh::ZResult result; std::get>(sub_data->sub_.value()).get( zenoh::KeyExpr(selector), std::move(opts), - &err); + &result); - if (err != Z_OK) { + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to get querying subscriber."); return; } @@ -322,8 +322,8 @@ bool SubscriptionData::init() }, zenoh::closures::none, std::move(sub_options), - &err); - if (err != Z_OK) { + &result); + if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } @@ -335,8 +335,8 @@ bool SubscriptionData::init() token_ = context_impl->session()->liveliness_declare_token( zenoh::KeyExpr(liveliness_keyexpr), zenoh::Session::LivelinessDeclarationOptions::create_default(), - &err); - if (err != Z_OK) { + &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the subscription."); @@ -410,9 +410,9 @@ rmw_ret_t SubscriptionData::shutdown() graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); // Unregister this subscription from the ROS graph. - zenoh::ZResult err; - std::move(token_).value().undeclare(&err); - if (err != Z_OK) { + zenoh::ZResult result; + std::move(token_).value().undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare liveliness token"); @@ -422,8 +422,8 @@ rmw_ret_t SubscriptionData::shutdown() if (sub_.has_value()) { zenoh::Subscriber * sub = std::get_if>(&sub_.value()); if (sub != nullptr) { - std::move(*sub).undeclare(&err); - if (err != Z_OK) { + std::move(*sub).undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "failed to undeclare sub."); @@ -433,8 +433,8 @@ rmw_ret_t SubscriptionData::shutdown() zenoh::ext::QueryingSubscriber * sub = std::get_if>(&sub_.value()); if (sub != nullptr) { - std::move(*sub).undeclare(&err); - if (err != Z_OK) { + std::move(*sub).undeclare(&result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "failed to undeclare querying sub."); @@ -496,16 +496,13 @@ rmw_ret_t SubscriptionData::take_one_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - auto slice = msg_data->payload.slice_iter().next(); - - if (slice.has_value()) { - const uint8_t * payload = slice.value().data; - const size_t payload_len = slice.value().len; + auto payload_data = msg_data->payload.as_vector(); + if (payload_data.size() > 0) { // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload)), - payload_len); + reinterpret_cast(const_cast(payload_data.data())), + payload_data.size()); // Object that serializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); @@ -558,20 +555,21 @@ rmw_ret_t SubscriptionData::take_serialized_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - auto slice = msg_data->payload.slice_iter().next(); + auto payload_data = msg_data->payload.as_vector(); - if (slice.has_value()) { - const uint8_t * payload = slice.value().data; - const size_t payload_len = slice.value().len; - if (serialized_message->buffer_capacity < payload_len) { + if (payload_data.size() > 0) { + if (serialized_message->buffer_capacity < payload_data.size()) { rmw_ret_t ret = - rmw_serialized_message_resize(serialized_message, payload_len); + rmw_serialized_message_resize(serialized_message, payload_data.size()); if (ret != RMW_RET_OK) { return ret; // Error message already set } } - serialized_message->buffer_length = payload_len; - memcpy(serialized_message->buffer, payload, payload_len); + serialized_message->buffer_length = payload_data.size(); + memcpy( + serialized_message->buffer, + reinterpret_cast(const_cast(payload_data.data())), + payload_data.size()); *taken = true; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index 6c906522..075d8806 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -61,9 +61,9 @@ std::optional _get_z_config( // if the environment variable is not set use internal configuration configured_uri = envar_uri[0] != '\0' ? envar_uri : default_uri; // Try to read the configuration - zenoh::ZResult err; - zenoh::Config config = zenoh::Config::from_file(configured_uri, &err); - if (err != Z_OK) { + zenoh::ZResult result; + zenoh::Config config = zenoh::Config::from_file(configured_uri, &result); + if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Invalid configuration file %s", configured_uri); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 9c0dab91..795c4dc9 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -56,8 +56,8 @@ zenoh::Bytes create_map_and_set_sequence_num( ZenohQuery::ZenohQuery( const zenoh::Query & query, std::chrono::nanoseconds::rep received_timestamp) +: query_(query.clone()) { - query_ = query.clone(); received_timestamp_ = received_timestamp; } @@ -71,7 +71,7 @@ std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const ZenohQuery::~ZenohQuery() {} ///============================================================================= -const zenoh::Query & ZenohQuery::get_query() const {return query_.value();} +const zenoh::Query & ZenohQuery::get_query() const {return query_;} ///============================================================================= ZenohReply::ZenohReply( diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 38bdb53b..4642799b 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -77,7 +77,7 @@ class ZenohQuery final std::chrono::nanoseconds::rep get_received_timestamp() const; private: - std::optional query_; + zenoh::Query query_; std::chrono::nanoseconds::rep received_timestamp_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index 2ca1808e..3d7570cc 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -35,7 +35,6 @@ rmw_get_publishers_info_by_topic( bool no_mangle, rmw_topic_endpoint_info_array_t * publishers_info) { - printf("zenoh::rmw_get_publishers_info_by_topic\n"); RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 59f753bd..23220fdf 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -751,7 +751,7 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - if (pub_data->is_shutdown()) { + if (!pub_data->liveliness_is_valid()) { return RMW_RET_ERROR; }