From 79ec141d49c243c5f5bd6d60e80188347a3ff93d Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 14 Mar 2024 23:47:36 +0800 Subject: [PATCH] Restrict query and reply queue sizes Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 24 +++++++++++++++++++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 14 ++++++++++++ 2 files changed, 38 insertions(+) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index b3845e30..8f88b75c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -189,6 +189,18 @@ void rmw_service_data_t::notify() void rmw_service_data_t::add_new_query(std::unique_ptr query) { std::lock_guard lock(query_queue_mutex_); + if (query_queue_.size() >= adapted_qos_profile.depth) { + // Log warning if message is discarded due to hitting the queue depth + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Query queue depth of %ld reached, discarding oldest Query " + "for service for %s", + adapted_qos_profile.depth, + z_loan(keystr)); + z_drop(z_move(keystr)); + query_queue_.pop_front(); + } query_queue_.emplace_back(std::move(query)); // Since we added new data, trigger user callback and guard condition if they are available @@ -238,6 +250,18 @@ void rmw_client_data_t::notify() void rmw_client_data_t::add_new_reply(std::unique_ptr reply) { std::lock_guard lock(reply_queue_mutex_); + if (reply_queue_.size() >= adapted_qos_profile.depth) { + // Log warning if message is discarded due to hitting the queue depth + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Reply queue depth of %ld reached, discarding oldest reply " + "for client for %s", + adapted_qos_profile.depth, + z_loan(keystr)); + z_drop(z_move(keystr)); + reply_queue_.pop_front(); + } reply_queue_.emplace_back(std::move(reply)); // Since we added new data, trigger user callback and guard condition if they are available diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4dd9ef1d..2d7d2159 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1958,6 +1958,15 @@ rmw_create_client( generate_random_gid(client_data->client_gid); + // Adapt any 'best available' QoS options + client_data->adapted_qos_profile = + rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile); + // If a depth of 0 was provided, the RMW implementation should choose a suitable default. + client_data->adapted_qos_profile.depth = + client_data->adapted_qos_profile.depth > 0 ? + client_data->adapted_qos_profile.depth : + RMW_ZENOH_DEFAULT_HISTORY_DEPTH; + // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { @@ -2480,6 +2489,11 @@ rmw_create_service( // Adapt any 'best available' QoS options service_data->adapted_qos_profile = rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profiles); + // If a depth of 0 was provided, the RMW implementation should choose a suitable default. + service_data->adapted_qos_profile.depth = + service_data->adapted_qos_profile.depth > 0 ? + service_data->adapted_qos_profile.depth : + RMW_ZENOH_DEFAULT_HISTORY_DEPTH; // Get the RMW type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports);