Skip to content

Commit

Permalink
Enable queries timeout (#853)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc authored Mar 25, 2024
1 parent 3a26455 commit 402010e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
38 changes: 20 additions & 18 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ pub fn route_query(
.compute_local_replies(&rtables, &prefix, expr.suffix, face);
let zid = rtables.zid;

let timeout = rtables.queries_default_timeout;

drop(queries_lock);
drop(rtables);

Expand Down Expand Up @@ -589,19 +591,18 @@ pub fn route_query(
expr.full_expr().to_string(),
));
} else {
// let timer = tables.timer.clone();
// let timeout = tables.queries_default_timeout;
#[cfg(feature = "complete_n")]
{
for ((outface, key_expr, context), qid, t) in route.values() {
// timer.add(TimedEvent::once(
// Instant::now() + timeout,
// QueryCleanup {
// tables: tables_ref.clone(),
// face: Arc::downgrade(&outface),
// *qid,
// },
// ));
let mut cleanup = QueryCleanup {
tables: tables_ref.clone(),
face: Arc::downgrade(outface),
qid: *qid,
};
tokio::task::spawn(async move {
tokio::time::sleep(timeout).await;
cleanup.run().await
});
#[cfg(feature = "stats")]
if !admin {
inc_req_stats!(outface, tx, user, body)
Expand Down Expand Up @@ -630,14 +631,15 @@ pub fn route_query(
#[cfg(not(feature = "complete_n"))]
{
for ((outface, key_expr, context), qid) in route.values() {
// timer.add(TimedEvent::once(
// Instant::now() + timeout,
// QueryCleanup {
// tables: tables_ref.clone(),
// face: Arc::downgrade(&outface),
// *qid,
// },
// ));
let mut cleanup = QueryCleanup {
tables: tables_ref.clone(),
face: Arc::downgrade(outface),
qid: *qid,
};
tokio::task::spawn(async move {
tokio::time::sleep(timeout).await;
cleanup.run().await
});
#[cfg(feature = "stats")]
if !admin {
inc_req_stats!(outface, tx, user, body)
Expand Down
12 changes: 5 additions & 7 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::sync::{Mutex, RwLock};
use std::time::Duration;
use uhlc::HLC;
use zenoh_config::unwrap_or_default;
use zenoh_config::Config;
use zenoh_protocol::core::{ExprId, WhatAmI, ZenohId};
use zenoh_protocol::network::Mapping;
use zenoh_result::ZResult;
// use zenoh_collections::Timer;
use zenoh_sync::get_mut_unchecked;

pub(crate) struct RoutingExpr<'a> {
Expand Down Expand Up @@ -64,8 +64,7 @@ pub struct Tables {
#[allow(dead_code)]
pub(crate) hlc: Option<Arc<HLC>>,
pub(crate) drop_future_timestamp: bool,
// pub(crate) timer: Timer,
// pub(crate) queries_default_timeout: Duration,
pub(crate) queries_default_timeout: Duration,
pub(crate) root_res: Arc<Resource>,
pub(crate) faces: HashMap<usize, Arc<FaceState>>,
pub(crate) mcast_groups: Vec<Arc<FaceState>>,
Expand All @@ -87,17 +86,16 @@ impl Tables {
unwrap_or_default!(config.timestamping().drop_future_timestamp());
let router_peers_failover_brokering =
unwrap_or_default!(config.routing().router().peers_failover_brokering());
// let queries_default_timeout =
// Duration::from_millis(unwrap_or_default!(config.queries_default_timeout()));
let queries_default_timeout =
Duration::from_millis(unwrap_or_default!(config.queries_default_timeout()));
let hat_code = hat::new_hat(whatami, config);
Ok(Tables {
zid,
whatami,
face_counter: 0,
hlc,
drop_future_timestamp,
// timer: Timer::new(true),
// queries_default_timeout,
queries_default_timeout,
root_res: Resource::root(),
faces: HashMap::new(),
mcast_groups: vec![],
Expand Down

0 comments on commit 402010e

Please sign in to comment.