diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 653849ee5a..a31eb9d8ab 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -248,6 +248,7 @@ impl Primitives for Face { &self.state, &msg.wire_expr, msg.ext_qos, + msg.ext_tstamp, msg.payload, msg.ext_nodeid.node_id, ); @@ -260,10 +261,10 @@ impl Primitives for Face { &self.tables, &self.state, &msg.wire_expr, - // parameters, msg.id, msg.ext_target, - // consolidation, + msg.ext_budget, + msg.ext_timeout, msg.payload, msg.ext_nodeid.node_id, ); diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 46a00bd382..42f10517ea 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -429,6 +429,7 @@ pub fn full_reentrant_route_data( face: &FaceState, expr: &WireExpr, ext_qos: ext::QoSType, + ext_tstamp: Option, mut payload: PushBody, routing_context: NodeId, ) { @@ -478,7 +479,7 @@ pub fn full_reentrant_route_data( outface.primitives.send_push(Push { wire_expr: key_expr.into(), ext_qos, - ext_tstamp: None, + ext_tstamp, ext_nodeid: ext::NodeIdType { node_id: *context }, payload, }) @@ -513,7 +514,7 @@ pub fn full_reentrant_route_data( outface.primitives.send_push(Push { wire_expr: key_expr, ext_qos, - ext_tstamp: None, + ext_tstamp, ext_nodeid: ext::NodeIdType { node_id: context }, payload: payload.clone(), }) @@ -540,7 +541,7 @@ pub fn full_reentrant_route_data( outface.primitives.send_push(Push { wire_expr: key_expr.into(), ext_qos, - ext_tstamp: None, + ext_tstamp, ext_nodeid: ext::NodeIdType { node_id: *context }, payload: payload.clone(), }) diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 62eae0703e..bdb303d719 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -22,14 +22,21 @@ use std::collections::HashMap; use std::sync::{Arc, Weak}; use std::time::Duration; use tokio_util::sync::CancellationToken; +use zenoh_buffers::ZBuf; use zenoh_config::WhatAmI; use zenoh_protocol::core::key_expr::keyexpr; +use zenoh_protocol::core::KnownEncoding; use zenoh_protocol::network::declare::queryable::ext::QueryableInfo; +use zenoh_protocol::zenoh; +use zenoh_protocol::zenoh::ext::ValueType; use zenoh_protocol::{ core::{Encoding, WireExpr}, network::{ declare::ext, - request::{ext::TargetType, Request, RequestId}, + request::{ + ext::{BudgetType, TargetType, TimeoutType}, + Request, RequestId, + }, response::{self, ext::ResponderIdType, Response, ResponseFinal}, }, zenoh::{reply::ext::ConsolidationType, Reply, RequestBody, ResponseBody}, @@ -365,6 +372,7 @@ struct QueryCleanup { tables: Arc, face: Weak, qid: RequestId, + timeout: Duration, } impl QueryCleanup { @@ -378,6 +386,7 @@ impl QueryCleanup { tables: tables_ref.clone(), face: Arc::downgrade(face), qid, + timeout, }; if let Some((_, cancellation_token)) = face.pending_queries.get(&qid) { let c_cancellation_token = cancellation_token.clone(); @@ -396,17 +405,42 @@ impl QueryCleanup { impl Timed for QueryCleanup { async fn run(&mut self) { if let Some(mut face) = self.face.upgrade() { - let tables_lock = zwrite!(self.tables.tables); + let ext_respid = Some(response::ext::ResponderIdType { + zid: face.zid, + eid: 0, + }); + route_send_response( + &self.tables, + &mut face, + self.qid, + ext_respid, + WireExpr::empty(), + ResponseBody::Err(zenoh::Err { + timestamp: None, + is_infrastructure: false, + ext_sinfo: None, + ext_unknown: vec![], + ext_body: Some(ValueType { + #[cfg(feature = "shared-memory")] + ext_shm: None, + payload: ZBuf::from("Timeout".as_bytes().to_vec()), + encoding: KnownEncoding::TextPlain.into(), + }), + code: 0, // TODO + }), + ); + let queries_lock = zwrite!(self.tables.queries_lock); if let Some(query) = get_mut_unchecked(&mut face) .pending_queries .remove(&self.qid) { - drop(tables_lock); + drop(queries_lock); tracing::warn!( - "Didn't receive final reply {}:{} from {}: Timeout!", + "Didn't receive final reply {}:{} from {}: Timeout({:#?})!", query.0.src_face, self.qid, - face + face, + self.timeout, ); finalize_pending_query(query); } @@ -513,12 +547,15 @@ macro_rules! inc_res_stats { }; } +#[allow(clippy::too_many_arguments)] pub fn route_query( tables_ref: &Arc, face: &Arc, expr: &WireExpr, qid: RequestId, - target: TargetType, + ext_target: TargetType, + ext_budget: Option, + ext_timeout: Option, body: RequestBody, routing_context: NodeId, ) { @@ -555,14 +592,15 @@ pub fn route_query( }); let queries_lock = zwrite!(tables_ref.queries_lock); - let route = compute_final_route(&rtables, &route, face, &mut expr, &target, query); + let route = + compute_final_route(&rtables, &route, face, &mut expr, &ext_target, query); let local_replies = rtables .hat_code .compute_local_replies(&rtables, &prefix, expr.suffix, face); let zid = rtables.zid; - let timeout = rtables.queries_default_timeout; + let timeout = ext_timeout.unwrap_or(rtables.queries_default_timeout); drop(queries_lock); drop(rtables); @@ -643,8 +681,8 @@ pub fn route_query( ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: *context }, ext_target: *t, - ext_budget: None, - ext_timeout: None, + ext_budget, + ext_timeout, payload: body.clone(), }, expr.full_expr().to_string(), @@ -673,9 +711,9 @@ pub fn route_query( ext_qos: ext::QoSType::request_default(), ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: *context }, - ext_target: target, - ext_budget: None, - ext_timeout: None, + ext_target, + ext_budget, + ext_timeout, payload: body.clone(), }, expr.full_expr().to_string(), diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 1b02a5964f..ebd6e66681 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -626,6 +626,7 @@ fn client_test() { &face0.upgrade().unwrap(), &"test/client/z1_wr1".into(), ext::QoSType::default(), + None, PushBody::Put(Put { timestamp: None, encoding: Encoding::default(), @@ -659,6 +660,7 @@ fn client_test() { &face0.upgrade().unwrap(), &WireExpr::from(11).with_suffix("/z1_wr2"), ext::QoSType::default(), + None, PushBody::Put(Put { timestamp: None, encoding: Encoding::default(), @@ -692,6 +694,7 @@ fn client_test() { &face1.upgrade().unwrap(), &"test/client/**".into(), ext::QoSType::default(), + None, PushBody::Put(Put { timestamp: None, encoding: Encoding::default(), @@ -725,6 +728,7 @@ fn client_test() { &face0.upgrade().unwrap(), &12.into(), ext::QoSType::default(), + None, PushBody::Put(Put { timestamp: None, encoding: Encoding::default(), @@ -758,6 +762,7 @@ fn client_test() { &face1.upgrade().unwrap(), &22.into(), ext::QoSType::default(), + None, PushBody::Put(Put { timestamp: None, encoding: Encoding::default(),