From 2099b8bc1b75569cd82bc1127e0454040ac056b2 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 18 Jan 2024 13:48:20 +0800 Subject: [PATCH] store adapted_qos_profiles in pub/sub/client/service Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 4 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 17 ++- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 136 +++++++++++++------- 3 files changed, 102 insertions(+), 55 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index d26c9e0d..72872e6a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -54,13 +54,13 @@ void sub_data_handler( { std::lock_guard lock(sub_data->message_queue_mutex); - if (sub_data->message_queue.size() >= sub_data->queue_depth) { + if (sub_data->message_queue.size() >= sub_data->adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Message queue depth of %ld reached, discarding oldest message " "for subscription for %s", - sub_data->queue_depth, + sub_data->adapted_qos_profile.depth, z_loan(keystr)); std::unique_ptr old = std::move(sub_data->message_queue.back()); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index e9e6f471..dc4058e9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -77,6 +77,9 @@ struct rmw_publisher_data_t // Optional publication cache when durability is transient_local. ze_owned_publication_cache_t pub_cache; + // Store the actual QoS profile used to configure this publisher. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the publisher. zc_owned_liveliness_token_t token; @@ -117,6 +120,9 @@ struct rmw_subscription_data_t // An owned subscriber or querying_subscriber depending on the QoS settings. std::variant sub; + // Store the actual QoS profile used to configure this subscription. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the subscription. zc_owned_liveliness_token_t token; @@ -128,9 +134,6 @@ struct rmw_subscription_data_t std::deque> message_queue; std::mutex message_queue_mutex; - size_t queue_depth; - bool reliable; - std::mutex internal_mutex; std::condition_variable * condition{nullptr}; }; @@ -151,6 +154,10 @@ struct rmw_service_data_t z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; + // Store the actual QoS profile used to configure this service. + // The QoS is reused for getting requests and sending responses. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the service. zc_owned_liveliness_token_t token; @@ -181,6 +188,10 @@ struct rmw_client_data_t z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; + // Store the actual QoS profile used to configure this client. + // The QoS is reused for sending requests and getting responses. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the client. zc_owned_liveliness_token_t token; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 65100e57..56edc921 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -461,13 +461,6 @@ rmw_create_publisher( "[rmw_create_publisher] %s", topic_name); - // Adapt any 'best available' QoS options - rmw_qos_profile_t adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( - node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); - if (RMW_RET_OK != ret) { - return nullptr; - } RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr); if (publisher_options->require_unique_network_flow_endpoints == RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED) @@ -529,6 +522,21 @@ rmw_create_publisher( allocator->deallocate(publisher_data, allocator->state); }); + RMW_TRY_PLACEMENT_NEW(publisher_data, publisher_data, return nullptr, rmw_publisher_data_t); + auto destruct_pub_data = rcpputils::make_scope_exit( + [publisher_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + publisher_data->~rmw_publisher_data_t(), + rmw_publisher_data_t); + }); + + // Adapt any 'best available' QoS options + publisher_data->adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( + node, topic_name, &publisher_data->adapted_qos_profile, rmw_get_subscriptions_info_by_topic); + if (RMW_RET_OK != ret) { + return nullptr; + } publisher_data->typesupport_identifier = type_support->typesupport_identifier; publisher_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -590,9 +598,9 @@ rmw_create_publisher( } // Create a Publication Cache if durability is transient_local. - if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + if (publisher_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); - pub_cache_opts.history = adapted_qos_profile.depth; + pub_cache_opts.history = publisher_data->adapted_qos_profile.depth; publisher_data->pub_cache = ze_declare_publication_cache( z_loan(context_impl->session), z_loan(keyexpr), @@ -612,8 +620,8 @@ rmw_create_publisher( // Set congestion_control to BLOCK if appropriate. z_publisher_options_t opts = z_publisher_options_default(); - if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && - adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + if (publisher_data->adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && + publisher_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } else { @@ -656,7 +664,7 @@ rmw_create_publisher( liveliness::EntityType::Publisher, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_publisher->topic_name, - publisher_data->type_support->get_name(), adapted_qos_profile} + publisher_data->type_support->get_name(), publisher_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -686,6 +694,7 @@ rmw_create_publisher( undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); + destruct_pub_data.cancel(); destruct_msg_type_support.cancel(); free_type_support.cancel(); free_publisher_data.cancel(); @@ -747,6 +756,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) RMW_SET_ERROR_MSG("failed to undeclare pub"); ret = RMW_RET_ERROR; } + RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, ); allocator->deallocate(publisher_data, allocator->state); } allocator->deallocate(const_cast(publisher->topic_name), allocator->state); @@ -984,9 +994,18 @@ rmw_publisher_get_actual_qos( const rmw_publisher_t * publisher, rmw_qos_profile_t * qos) { - static_cast(publisher); - static_cast(qos); - // TODO(yadunund): Fixme. + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_publisher_data_t * pub_data = static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + + *qos = pub_data->adapted_qos_profile; return RMW_RET_OK; } @@ -1188,14 +1207,6 @@ rmw_create_subscription( return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); - // Adapt any 'best available' QoS options - rmw_qos_profile_t adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( - node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); - return nullptr; - } RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr); RCUTILS_LOG_INFO_NAMED( @@ -1268,6 +1279,15 @@ rmw_create_subscription( rmw_subscription_data_t); }); + // Adapt any 'best available' QoS options + sub_data->adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( + node, topic_name, &sub_data->adapted_qos_profile, rmw_get_publishers_info_by_topic); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } + sub_data->typesupport_identifier = type_support->typesupport_identifier; sub_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -1294,7 +1314,6 @@ rmw_create_subscription( MessageTypeSupport); }); - sub_data->queue_depth = qos_profile->depth; sub_data->context = node->context; rmw_subscription->implementation_identifier = rmw_zenoh_identifier; @@ -1344,12 +1363,10 @@ rmw_create_subscription( z_drop(z_move(owned_key_str)); }); - sub_data->reliable = false; - if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + if (sub_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); - if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + if (sub_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { sub_options.reliability = Z_RELIABILITY_RELIABLE; - sub_data->reliable = true; } sub_data->sub = ze_declare_querying_subscriber( z_loan(context_impl->session), @@ -1368,7 +1385,6 @@ rmw_create_subscription( z_subscriber_options_t sub_options = z_subscriber_options_default(); if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { sub_options.reliability = Z_RELIABILITY_RELIABLE; - sub_data->reliable = true; } sub_data->sub = z_declare_subscriber( z_loan(context_impl->session), @@ -1403,7 +1419,7 @@ rmw_create_subscription( liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name, - sub_data->type_support->get_name(), adapted_qos_profile} + sub_data->type_support->get_name(), sub_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -1528,10 +1544,7 @@ rmw_subscription_get_actual_qos( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - qos->reliability = sub_data->reliable ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : - RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; - qos->durability = std::holds_alternative(sub_data->sub) ? - RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE; + *qos = sub_data->adapted_qos_profile; return RMW_RET_OK; } @@ -1863,6 +1876,10 @@ rmw_create_client( rmw_client_data_t); }); + // Adapt any 'best available' QoS options + client_data->adapted_qos_profile = + rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile); + // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { @@ -1973,7 +1990,7 @@ rmw_create_client( liveliness::EntityType::Client, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_client->service_name, - std::move(service_type), *qos_profile} + std::move(service_type), client_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2320,9 +2337,18 @@ rmw_client_request_publisher_get_actual_qos( const rmw_client_t * client, rmw_qos_profile_t * qos) { - // TODO(francocipollone): Fix. - static_cast(client); - static_cast(qos); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_client_data_t * client_data = static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); + + *qos = client_data->adapted_qos_profile; return RMW_RET_OK; } @@ -2333,10 +2359,8 @@ rmw_client_response_subscription_get_actual_qos( const rmw_client_t * client, rmw_qos_profile_t * qos) { - // TODO(francocipollone): Fix. - static_cast(client); - static_cast(qos); - return RMW_RET_OK; + // The same QoS profile is used for sending requests and receiving responses. + return rmw_client_request_publisher_get_actual_qos(client, qos); } //============================================================================== @@ -2435,6 +2459,11 @@ rmw_create_service( rmw_service_data_t); }); + // Adapt any 'best available' QoS options + service_data->adapted_qos_profile = + rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profiles); + + // Get the RMW type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { @@ -2561,7 +2590,7 @@ rmw_create_service( liveliness::EntityType::Service, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_service->service_name, - std::move(service_type), *qos_profiles} + std::move(service_type), service_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2852,9 +2881,18 @@ rmw_service_request_subscription_get_actual_qos( const rmw_service_t * service, rmw_qos_profile_t * qos) { - // TODO(yadunund): Fix. - static_cast(service); - static_cast(qos); + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_service_data_t * service_data = static_cast(service->data); + RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); + + *qos = service_data->adapted_qos_profile; return RMW_RET_OK; } @@ -2865,10 +2903,8 @@ rmw_service_response_publisher_get_actual_qos( const rmw_service_t * service, rmw_qos_profile_t * qos) { - // TODO(yadunund): Fix. - static_cast(service); - static_cast(qos); - return RMW_RET_OK; + // The same QoS profile is used for receiving requests and sending responses. + return rmw_service_request_subscription_get_actual_qos(service, qos); } //==============================================================================