Skip to content

Commit

Permalink
Restrict query and reply queue sizes
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Mar 14, 2024
1 parent 00e8e1c commit 79ec141
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
24 changes: 24 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ void rmw_service_data_t::notify()
void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
if (query_queue_.size() >= adapted_qos_profile.depth) {
// Log warning if message is discarded due to hitting the queue depth
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr));
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Query queue depth of %ld reached, discarding oldest Query "
"for service for %s",
adapted_qos_profile.depth,
z_loan(keystr));
z_drop(z_move(keystr));
query_queue_.pop_front();
}
query_queue_.emplace_back(std::move(query));

// Since we added new data, trigger user callback and guard condition if they are available
Expand Down Expand Up @@ -238,6 +250,18 @@ void rmw_client_data_t::notify()
void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);
if (reply_queue_.size() >= adapted_qos_profile.depth) {
// Log warning if message is discarded due to hitting the queue depth
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr));
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Reply queue depth of %ld reached, discarding oldest reply "
"for client for %s",
adapted_qos_profile.depth,
z_loan(keystr));
z_drop(z_move(keystr));
reply_queue_.pop_front();
}
reply_queue_.emplace_back(std::move(reply));

// Since we added new data, trigger user callback and guard condition if they are available
Expand Down
14 changes: 14 additions & 0 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1958,6 +1958,15 @@ rmw_create_client(

generate_random_gid(client_data->client_gid);

// Adapt any 'best available' QoS options
client_data->adapted_qos_profile =
rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile);
// If a depth of 0 was provided, the RMW implementation should choose a suitable default.
client_data->adapted_qos_profile.depth =
client_data->adapted_qos_profile.depth > 0 ?
client_data->adapted_qos_profile.depth :
RMW_ZENOH_DEFAULT_HISTORY_DEPTH;

// Obtain the type support
const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports);
if (type_support == nullptr) {
Expand Down Expand Up @@ -2480,6 +2489,11 @@ rmw_create_service(
// Adapt any 'best available' QoS options
service_data->adapted_qos_profile =
rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profiles);
// If a depth of 0 was provided, the RMW implementation should choose a suitable default.
service_data->adapted_qos_profile.depth =
service_data->adapted_qos_profile.depth > 0 ?
service_data->adapted_qos_profile.depth :
RMW_ZENOH_DEFAULT_HISTORY_DEPTH;

// Get the RMW type support.
const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports);
Expand Down

0 comments on commit 79ec141

Please sign in to comment.