diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 363f7f55..d7d68906 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -59,6 +59,31 @@ zenoh::Bytes AttachementData::serialize_to_zbytes() return std::move(serializer).finish(); } +AttachementData::AttachementData(const zenoh::Bytes & attachment) +{ + zenoh::ext::Deserializer deserializer(std::move(attachment)); + const auto sequence_number_str = deserializer.deserialize(); + if (sequence_number_str != "sequence_number") + { + throw std::runtime_error("sequence_number is not found in the attachment."); + } + this->sequence_number = deserializer.deserialize(); + + const auto source_timestamp_str = deserializer.deserialize(); + if (source_timestamp_str != "source_timestamp") + { + throw std::runtime_error("source_timestamp is not found in the attachment."); + } + this->source_timestamp = deserializer.deserialize(); + + const auto source_gid_str = deserializer.deserialize(); + if (source_gid_str != "source_gid") + { + throw std::runtime_error("source_gid is not found in the attachment."); + } + this->source_gid = deserializer.deserialize>(); +} + attachement_data_t::attachement_data_t( const int64_t _sequence_number, const int64_t _source_timestamp, diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index 3274318a..48248427 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -30,7 +30,7 @@ class AttachementData final const int64_t _source_timestamp, const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]); - // explicit AttachementData(const zenoh::Bytes & bytes); + explicit AttachementData(const zenoh::Bytes & bytes); explicit AttachementData(AttachementData && data); int64_t sequence_number; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 462c1770..0cf42299 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -39,54 +39,18 @@ namespace rmw_zenoh_cpp { -namespace -{ -//============================================================================== -// TODO(Yadunund): Make this a class member and lambda capture weak_from_this() -// instead of passing a rawptr to SubscriptionData when we switch to zenoh-cpp. -void sub_data_handler(z_loaned_sample_t * sample, void * data) -{ - z_view_string_t keystr; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - - auto sub_data = static_cast(data); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain SubscriptionData from data for %s.", - z_loan(keystr) - ); - return; - } - - attachement_data_t attachment(z_sample_attachment(sample)); - const z_loaned_bytes_t * payload = z_sample_payload(sample); - - z_owned_slice_t slice; - z_bytes_to_slice(payload, &slice); - - sub_data->add_new_message( - std::make_unique( - slice, - z_timestamp_ntp64_time(z_sample_timestamp(sample)), - std::move(attachment)), - z_string_data(z_loan(keystr))); -} -} // namespace - ///============================================================================= SubscriptionData::Message::Message( - z_owned_slice_t p, + zenoh::Bytes p, uint64_t recv_ts, - attachement_data_t && attachment_) -: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) + AttachementData && attachment_) +: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { } ///============================================================================= SubscriptionData::Message::~Message() { - z_drop(z_move(payload)); } ///============================================================================= @@ -201,47 +165,25 @@ bool SubscriptionData::init() "`reliability` no longer supported on subscriber. Ignoring..."); } - // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr - // in the closure callback once we switch to zenoh-cpp. - z_owned_closure_sample_t callback; - z_closure(&callback, sub_data_handler, nullptr, this); - - std::string topic_keyexpr = entity_->topic_info()->topic_keyexpr_; - z_view_keyexpr_t sub_ke; - if (z_view_keyexpr_from_str(&sub_ke, topic_keyexpr.c_str()) != Z_OK) { + zenoh::ZResult err; + zenoh::KeyExpr sub_ke(entity_->topic_info()->topic_keyexpr_, true, &err); + if (err != Z_OK) + { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return false; } rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub == nullptr || z_undeclare_subscriber(z_move(*sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_); - if (querying_sub == nullptr || ze_undeclare_querying_subscriber(z_move(*querying_sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } - } - }); - // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_querying_subscriber_options_t sub_options; - ze_querying_subscriber_options_default(&sub_options); - // Make the initial query to hit all the PublicationCaches, using a query_selector with - // '*' in place of the queryable_prefix of each PublicationCache + zenoh::Session::QueryingSubscriberOptions sub_options = zenoh::Session::QueryingSubscriberOptions::create_default(); const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_; - z_view_keyexpr_t selector_ke; - z_view_keyexpr_from_str(&selector_ke, selector.c_str()); - sub_options.query_selector = z_loan(selector_ke); + zenoh::KeyExpr selector_ke(selector); + sub_options.query_keyexpr = std::move(selector_ke); // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. // By default a query accepts only replies that matches its query selector. // This allows us to selectively query certain PublicationCaches when defining the @@ -252,19 +194,52 @@ bool SubscriptionData::init() // We set consolidation to none as we need to receive transient local messages // from a number of publishers. Eg: To receive TF data published over /tf_static // by various publishers. - sub_options.query_consolidation = z_query_consolidation_none(); - ze_owned_querying_subscriber_t sub; - if (ze_declare_querying_subscriber( - context_impl->session(), &sub, z_loan(sub_ke), z_move(callback), &sub_options)) + sub_options.query_consolidation = zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); + + std::weak_ptr data_wp = shared_from_this(); + auto sub = context_impl->session_cpp()->declare_querying_subscriber( + sub_ke, + [data_wp](const zenoh::Sample& sample) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain SubscriptionData from data for %s.", + std::string(sample.get_keyexpr().as_string_view())); + return; + } + + auto attachement = sample.get_attachment(); + if (!attachement.has_value()) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachement") + return; + } + + auto attachement_value = attachement.value(); + AttachementData attachment_data(attachement_value); + + sub_data->add_new_message( + std::make_unique( + sample.get_payload().clone(), + sample.get_timestamp().value().get_time(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &err); + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } - sub_ = sub; + sub_ = std::move(sub); // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. - std::weak_ptr data_wp = shared_from_this(); graph_cache_->set_querying_subscriber_callback( entity_->topic_info().value().topic_keyexpr_, entity_->keyexpr_hash(), @@ -281,46 +256,80 @@ bool SubscriptionData::init() std::lock_guard lock(sub_data->mutex_); const std::string selector = queryable_prefix + - "/" + - sub_data->entity_->topic_info().value().topic_keyexpr_; + "/" + + sub_data->entity_->topic_info().value().topic_keyexpr_; RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "QueryingSubscriberCallback triggered over %s.", selector.c_str() ); - z_get_options_t opts; - z_get_options_default(&opts); + zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); opts.timeout_ms = std::numeric_limits::max(); - opts.consolidation = z_query_consolidation_latest(); + opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; - z_view_keyexpr_t ke; - z_view_keyexpr_from_str(&ke, selector.c_str()); - ze_querying_subscriber_get( - z_loan(std::get(sub_data->sub_)), - z_loan(ke), - &opts); + zenoh::ZResult err; + std::get>( + sub_data->sub_.value()).get(zenoh::KeyExpr(selector), + std::move(opts), + &err); + + if (err != Z_OK) + { + RMW_SET_ERROR_MSG("unable to get querying subscriber."); + return; + } } ); } else { - // Create a regular subscriber for all other durability settings. - z_subscriber_options_t sub_options; - z_subscriber_options_default(&sub_options); - - z_owned_subscriber_t sub; - if (z_declare_subscriber( - context_impl->session(), &sub, z_loan(sub_ke), z_move(callback), - &sub_options) != Z_OK) + zenoh::Session::SubscriberOptions sub_options = zenoh::Session::SubscriberOptions::create_default(); + std::weak_ptr data_wp = shared_from_this(); + zenoh::Subscriber sub = context_impl->session_cpp()->declare_subscriber( + sub_ke, + [data_wp](const zenoh::Sample & sample) { + + zenoh::KeyExpr keystr(std::string(sample.get_keyexpr().as_string_view())); + + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to lock weak_ptr within querying subscription callback." + ); + return; + } + auto attachement = sample.get_attachment(); + if (!attachement.has_value()) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachement") + return; + } + auto payload = sample.get_payload().clone(); + auto attachement_value = attachement.value(); + + AttachementData attachment_data(attachement_value); + sub_data->add_new_message( + std::make_unique( + sample.get_payload().clone(), + sample.get_timestamp().value().get_time(), + std::move(attachment_data)), + std::string(keystr.as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &err); + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } - sub_ = sub; + sub_ = std::move(sub); } // Publish to the graph that a new subscription is in town. std::string liveliness_keyexpr = entity_->liveliness_keyexpr(); - zenoh::ZResult err; token_ = context_impl->session_cpp()->liveliness_declare_token( zenoh::KeyExpr(liveliness_keyexpr), zenoh::Session::LivelinessDeclarationOptions::create_default(), @@ -333,8 +342,6 @@ bool SubscriptionData::init() return false; } - undeclare_z_sub.cancel(); - initialized_ = true; return true; @@ -402,20 +409,31 @@ rmw_ret_t SubscriptionData::shutdown() return RMW_RET_ERROR; } - z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub != nullptr) { - if (z_undeclare_subscriber(z_move(*sub)) != Z_OK) { - RMW_SET_ERROR_MSG("failed to undeclare sub."); - ret = RMW_RET_ERROR; - } - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_); - if (querying_sub != nullptr) { - if (ze_undeclare_querying_subscriber(z_move(*querying_sub)) != Z_OK) { - RMW_SET_ERROR_MSG("failed to undeclare querying sub."); - ret = RMW_RET_ERROR; + if (sub_.has_value()) + { + zenoh::Subscriber * sub = std::get_if>(&sub_.value()); + if (sub != nullptr) { + std::move(*sub).undeclare(&err); + if (err != Z_OK) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to undeclare sub."); + return RMW_RET_ERROR; + } + } else { + zenoh::ext::QueryingSubscriber * sub = std::get_if>(&sub_.value()); + if (sub != nullptr) { + std::move(*sub).undeclare(&err); + if (err != Z_OK) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to undeclare querying sub."); + return RMW_RET_ERROR; + } } + } } @@ -470,37 +488,45 @@ rmw_ret_t SubscriptionData::take_one_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); - const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + auto slice = msg_data->payload.slice_iter().next(); - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload)), - payload_len); + if (slice.has_value()) { + const uint8_t * payload = slice.value().data; + const size_t payload_len = slice.value().len; - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!type_support_->deserialize_ros_message( - deser.get_cdr(), - ros_message, - type_support_impl_)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS message"); - return RMW_RET_ERROR; - } + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(payload)), + payload_len); - if (message_info != nullptr) { - message_info->source_timestamp = msg_data->attachment.source_timestamp; - message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->attachment.sequence_number; - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; - message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid, RMW_GID_STORAGE_SIZE); - message_info->from_intra_process = false; - } + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_message, + type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS message"); + return RMW_RET_ERROR; + } - *taken = true; + if (message_info != nullptr) { + message_info->source_timestamp = msg_data->attachment.source_timestamp; + message_info->received_timestamp = msg_data->recv_timestamp; + message_info->publication_sequence_number = msg_data->attachment.sequence_number; + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid.data(), RMW_GID_STORAGE_SIZE); + message_info->from_intra_process = false; + } + *taken = true; + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } @@ -521,30 +547,38 @@ rmw_ret_t SubscriptionData::take_serialized_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); - const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + auto slice = msg_data->payload.slice_iter().next(); - if (serialized_message->buffer_capacity < payload_len) { - rmw_ret_t ret = - rmw_serialized_message_resize(serialized_message, payload_len); - if (ret != RMW_RET_OK) { - return ret; // Error message already set + if (slice.has_value()) { + const uint8_t * payload = slice.value().data; + const size_t payload_len = slice.value().len; + if (serialized_message->buffer_capacity < payload_len) { + rmw_ret_t ret = + rmw_serialized_message_resize(serialized_message, payload_len); + if (ret != RMW_RET_OK) { + return ret; // Error message already set + } } - } - serialized_message->buffer_length = payload_len; - memcpy(serialized_message->buffer, payload, payload_len); - - *taken = true; - - if (message_info != nullptr) { - message_info->source_timestamp = msg_data->attachment.source_timestamp; - message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->attachment.sequence_number; - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; - message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid, RMW_GID_STORAGE_SIZE); - message_info->from_intra_process = false; + serialized_message->buffer_length = payload_len; + memcpy(serialized_message->buffer, payload, payload_len); + + *taken = true; + + if (message_info != nullptr) { + message_info->source_timestamp = msg_data->attachment.source_timestamp; + message_info->received_timestamp = msg_data->recv_timestamp; + message_info->publication_sequence_number = msg_data->attachment.sequence_number; + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid.data(), RMW_GID_STORAGE_SIZE); + message_info->from_intra_process = false; + } + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); + return RMW_RET_ERROR; } return RMW_RET_OK; @@ -581,7 +615,7 @@ void SubscriptionData::add_new_message( } // Check for messages lost if the new sequence number is not monotonically increasing. - const size_t gid_hash = hash_gid(msg->attachment.source_gid); + const size_t gid_hash = hash_gid(msg->attachment.source_gid.data()); auto last_known_pub_it = last_known_published_msg_.find(gid_hash); if (last_known_pub_it != last_known_published_msg_.end()) { const int64_t seq_increment = std::abs( diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 7eeb0fa7..5f1bcb86 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -49,15 +49,15 @@ class SubscriptionData final : public std::enable_shared_from_this entity_; // An owned subscriber or querying_subscriber depending on the QoS settings. - std::variant sub_; + std::optional, zenoh::ext::QueryingSubscriber>> sub_; // Liveliness token for the subscription. std::optional token_; // Type support fields diff --git a/rmw_zenoh_cpp/src/detail/type_support.cpp b/rmw_zenoh_cpp/src/detail/type_support.cpp index b2cc2848..0a18b73b 100644 --- a/rmw_zenoh_cpp/src/detail/type_support.cpp +++ b/rmw_zenoh_cpp/src/detail/type_support.cpp @@ -129,10 +129,10 @@ bool TypeSupport::deserialize_ros_message( uint8_t dump = 0; deser >> dump; (void)dump; - } catch (const eprosima::fastcdr::exception::Exception &) { + } catch (const eprosima::fastcdr::exception::Exception & e) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "Fast CDR exception deserializing message of type %s.", - get_name()); + "Fast CDR exception deserializing message of type %s. %s", + get_name(), e.what()); return false; } diff --git a/zenoh_cpp_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt index ee633afb..c53bfe45 100644 --- a/zenoh_cpp_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -26,7 +26,7 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memor # - https://github.com/eclipse-zenoh/zenoh-c/pull/620 (fix ze_querying_subscriber_get API to query newly discovered publishers) ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 76c92a738d5fb720a441247c347bdd42090329e7 + VCS_VERSION f5110732a303f31ed6d835684de1fb7f2c7af001 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" @@ -37,7 +37,7 @@ ament_export_dependencies(zenohc) ament_vendor(zenoh_cpp_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp - VCS_VERSION 730f993ade9e715b0e44e623c1477ebe54f1ec7a + VCS_VERSION a1875f9085bc068b6c5140778ff5415ae82248d7 CMAKE_ARGS -DZENOHCXX_ZENOHC=OFF )