From 2532c11551080f70ce64a3ef711576ebb6ed9f15 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Jul 2024 16:37:04 +0200 Subject: [PATCH] Revert "Optimize RoutingContext keyexpr for Query and Response messages (#1266)" This reverts commit e38fc16b9988c5c6add6d2e19667b6badc86993d. --- zenoh/src/api/session.rs | 12 +- zenoh/src/net/primitives/mod.rs | 12 +- zenoh/src/net/primitives/mux.rs | 246 +++++++++--------- zenoh/src/net/routing/dispatcher/queries.rs | 111 +++++--- .../net/routing/interceptor/access_control.rs | 7 +- zenoh/src/net/runtime/adminspace.rs | 12 +- zenoh/src/net/tests/tables.rs | 10 +- zenoh/tests/authentication.rs | 92 +++---- 8 files changed, 269 insertions(+), 233 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4ca924e023..a2745ecd96 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -2799,18 +2799,18 @@ impl crate::net::primitives::EPrimitives for Session { } #[inline] - fn send_request(&self, msg: Request) { - (self as &dyn Primitives).send_request(msg) + fn send_request(&self, ctx: crate::net::routing::RoutingContext) { + (self as &dyn Primitives).send_request(ctx.msg) } #[inline] - fn send_response(&self, msg: Response) { - (self as &dyn Primitives).send_response(msg) + fn send_response(&self, ctx: crate::net::routing::RoutingContext) { + (self as &dyn Primitives).send_response(ctx.msg) } #[inline] - fn send_response_final(&self, msg: ResponseFinal) { - (self as &dyn Primitives).send_response_final(msg) + fn send_response_final(&self, ctx: crate::net::routing::RoutingContext) { + (self as &dyn Primitives).send_response_final(ctx.msg) } fn as_any(&self) -> &dyn std::any::Any { diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index 837571f7f6..73768a4cca 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -49,11 +49,11 @@ pub(crate) trait EPrimitives: Send + Sync { fn send_push(&self, msg: Push); - fn send_request(&self, msg: Request); + fn send_request(&self, ctx: RoutingContext); - fn send_response(&self, msg: Response); + fn send_response(&self, ctx: RoutingContext); - fn send_response_final(&self, msg: ResponseFinal); + fn send_response_final(&self, ctx: RoutingContext); } #[derive(Default)] @@ -82,11 +82,11 @@ impl EPrimitives for DummyPrimitives { fn send_push(&self, _msg: Push) {} - fn send_request(&self, _msg: Request) {} + fn send_request(&self, _ctx: RoutingContext) {} - fn send_response(&self, _msg: Response) {} + fn send_response(&self, _ctx: RoutingContext) {} - fn send_response_final(&self, _msg: ResponseFinal) {} + fn send_response_final(&self, _ctx: RoutingContext) {} fn as_any(&self) -> &dyn Any { self diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index bc718ba324..4627017a72 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -271,75 +271,78 @@ impl EPrimitives for Mux { } } - fn send_request(&self, msg: Request) { - let msg = NetworkMessage { - body: NetworkBody::Request(msg), - #[cfg(feature = "stats")] - size: None, + fn send_request(&self, ctx: RoutingContext) { + let ctx = RoutingContext { + msg: NetworkMessage { + body: NetworkBody::Request(ctx.msg), + #[cfg(feature = "stats")] + size: None, + }, + inface: ctx.inface, + outface: ctx.outface, + prefix: ctx.prefix, + full_expr: ctx.full_expr, }; - if self.interceptor.interceptors.is_empty() { - let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { - let ctx = RoutingContext::new_out(msg, face.clone()); - let prefix = ctx - .wire_expr() - .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) - .flatten() - .cloned(); - let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face)); - if let Some(ctx) = self.interceptor.intercept(ctx, cache) { - let _ = self.handler.schedule(ctx.msg); - } - } else { - tracing::error!("Uninitialized multiplexer!"); + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); } } - fn send_response(&self, msg: Response) { - let msg = NetworkMessage { - body: NetworkBody::Response(msg), - #[cfg(feature = "stats")] - size: None, + fn send_response(&self, ctx: RoutingContext) { + let ctx = RoutingContext { + msg: NetworkMessage { + body: NetworkBody::Response(ctx.msg), + #[cfg(feature = "stats")] + size: None, + }, + inface: ctx.inface, + outface: ctx.outface, + prefix: ctx.prefix, + full_expr: ctx.full_expr, }; - if self.interceptor.interceptors.is_empty() { - let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { - let ctx = RoutingContext::new_out(msg, face.clone()); - let prefix = ctx - .wire_expr() - .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) - .flatten() - .cloned(); - let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face)); - if let Some(ctx) = self.interceptor.intercept(ctx, cache) { - let _ = self.handler.schedule(ctx.msg); - } - } else { - tracing::error!("Uninitialized multiplexer!"); + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); } } - fn send_response_final(&self, msg: ResponseFinal) { - let msg = NetworkMessage { - body: NetworkBody::ResponseFinal(msg), - #[cfg(feature = "stats")] - size: None, + fn send_response_final(&self, ctx: RoutingContext) { + let ctx = RoutingContext { + msg: NetworkMessage { + body: NetworkBody::ResponseFinal(ctx.msg), + #[cfg(feature = "stats")] + size: None, + }, + inface: ctx.inface, + outface: ctx.outface, + prefix: ctx.prefix, + full_expr: ctx.full_expr, }; - if self.interceptor.interceptors.is_empty() { - let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { - let ctx = RoutingContext::new_out(msg, face.clone()); - let prefix = ctx - .wire_expr() - .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) - .flatten() - .cloned(); - let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face)); - if let Some(ctx) = self.interceptor.intercept(ctx, cache) { - let _ = self.handler.schedule(ctx.msg); - } - } else { - tracing::error!("Uninitialized multiplexer!"); + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); } } @@ -589,75 +592,78 @@ impl EPrimitives for McastMux { } } - fn send_request(&self, msg: Request) { - let msg = NetworkMessage { - body: NetworkBody::Request(msg), - #[cfg(feature = "stats")] - size: None, + fn send_request(&self, ctx: RoutingContext) { + let ctx = RoutingContext { + msg: NetworkMessage { + body: NetworkBody::Request(ctx.msg), + #[cfg(feature = "stats")] + size: None, + }, + inface: ctx.inface, + outface: ctx.outface, + prefix: ctx.prefix, + full_expr: ctx.full_expr, }; - if self.interceptor.interceptors.is_empty() { - let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { - let ctx = RoutingContext::new_out(msg, face.clone()); - let prefix = ctx - .wire_expr() - .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) - .flatten() - .cloned(); - let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); - if let Some(ctx) = self.interceptor.intercept(ctx, cache) { - let _ = self.handler.schedule(ctx.msg); - } - } else { - tracing::error!("Uninitialized multiplexer!"); + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); } } - fn send_response(&self, msg: Response) { - let msg = NetworkMessage { - body: NetworkBody::Response(msg), - #[cfg(feature = "stats")] - size: None, + fn send_response(&self, ctx: RoutingContext) { + let ctx = RoutingContext { + msg: NetworkMessage { + body: NetworkBody::Response(ctx.msg), + #[cfg(feature = "stats")] + size: None, + }, + inface: ctx.inface, + outface: ctx.outface, + prefix: ctx.prefix, + full_expr: ctx.full_expr, }; - if self.interceptor.interceptors.is_empty() { - let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { - let ctx = RoutingContext::new_out(msg, face.clone()); - let prefix = ctx - .wire_expr() - .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) - .flatten() - .cloned(); - let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); - if let Some(ctx) = self.interceptor.intercept(ctx, cache) { - let _ = self.handler.schedule(ctx.msg); - } - } else { - tracing::error!("Uninitialized multiplexer!"); + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); } } - fn send_response_final(&self, msg: ResponseFinal) { - let msg = NetworkMessage { - body: NetworkBody::ResponseFinal(msg), - #[cfg(feature = "stats")] - size: None, + fn send_response_final(&self, ctx: RoutingContext) { + let ctx = RoutingContext { + msg: NetworkMessage { + body: NetworkBody::ResponseFinal(ctx.msg), + #[cfg(feature = "stats")] + size: None, + }, + inface: ctx.inface, + outface: ctx.outface, + prefix: ctx.prefix, + full_expr: ctx.full_expr, }; - if self.interceptor.interceptors.is_empty() { - let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { - let ctx = RoutingContext::new_out(msg, face.clone()); - let prefix = ctx - .wire_expr() - .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) - .flatten() - .cloned(); - let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); - if let Some(ctx) = self.interceptor.intercept(ctx, cache) { - let _ = self.handler.schedule(ctx.msg); - } - } else { - tracing::error!("Uninitialized multiplexer!"); + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); } } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index c117bd51df..445f138d8d 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -43,7 +43,10 @@ use super::{ resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource}, tables::{NodeId, RoutingExpr, Tables, TablesLock}, }; -use crate::net::routing::hat::{HatTrait, SendDeclare}; +use crate::net::routing::{ + hat::{HatTrait, SendDeclare}, + RoutingContext, +}; pub(crate) struct Query { src_face: Arc, @@ -597,11 +600,16 @@ pub fn route_query( face, qid ); - face.primitives.clone().send_response_final(ResponseFinal { - rid: qid, - ext_qos: response::ext::QoSType::RESPONSE_FINAL, - ext_tstamp: None, - }); + face.primitives + .clone() + .send_response_final(RoutingContext::with_expr( + ResponseFinal { + rid: qid, + ext_qos: response::ext::QoSType::RESPONSE_FINAL, + ext_tstamp: None, + }, + expr.full_expr().to_string(), + )); } else { for ((outface, key_expr, context), qid) in route.values() { QueryCleanup::spawn_query_clean_up_task(outface, tables_ref, *qid, timeout); @@ -613,27 +621,35 @@ pub fn route_query( } tracing::trace!("Propagate query {}:{} to {}", face, qid, outface); - outface.primitives.send_request(Request { - id: *qid, - wire_expr: key_expr.into(), - ext_qos, - ext_tstamp, - ext_nodeid: ext::NodeIdType { node_id: *context }, - ext_target, - ext_budget, - ext_timeout, - payload: body.clone(), - }); + outface.primitives.send_request(RoutingContext::with_expr( + Request { + id: *qid, + wire_expr: key_expr.into(), + ext_qos, + ext_tstamp, + ext_nodeid: ext::NodeIdType { node_id: *context }, + ext_target, + ext_budget, + ext_timeout, + payload: body.clone(), + }, + expr.full_expr().to_string(), + )); } } } else { tracing::debug!("Send final reply {}:{} (not master)", face, qid); drop(rtables); - face.primitives.clone().send_response_final(ResponseFinal { - rid: qid, - ext_qos: response::ext::QoSType::RESPONSE_FINAL, - ext_tstamp: None, - }); + face.primitives + .clone() + .send_response_final(RoutingContext::with_expr( + ResponseFinal { + rid: qid, + ext_qos: response::ext::QoSType::RESPONSE_FINAL, + ext_tstamp: None, + }, + expr.full_expr().to_string(), + )); } } None => { @@ -643,11 +659,16 @@ pub fn route_query( expr.scope, ); drop(rtables); - face.primitives.clone().send_response_final(ResponseFinal { - rid: qid, - ext_qos: response::ext::QoSType::RESPONSE_FINAL, - ext_tstamp: None, - }); + face.primitives + .clone() + .send_response_final(RoutingContext::with_expr( + ResponseFinal { + rid: qid, + ext_qos: response::ext::QoSType::RESPONSE_FINAL, + ext_tstamp: None, + }, + "".to_string(), + )); } } } @@ -684,14 +705,21 @@ pub(crate) fn route_send_response( inc_res_stats!(query.src_face, tx, admin, body) } - query.src_face.primitives.send_response(Response { - rid: query.src_qid, - wire_expr: key_expr.to_owned(), - payload: body, - ext_qos, - ext_tstamp, - ext_respid, - }); + query + .src_face + .primitives + .clone() + .send_response(RoutingContext::with_expr( + Response { + rid: query.src_qid, + wire_expr: key_expr.to_owned(), + payload: body, + ext_qos, + ext_tstamp, + ext_respid, + }, + "".to_string(), // @TODO provide the proper key expression of the response for interceptors + )); } None => tracing::warn!( "Route reply {}:{} from {}: Query not found!", @@ -745,10 +773,13 @@ pub(crate) fn finalize_pending_query(query: (Arc, CancellationToken)) { .src_face .primitives .clone() - .send_response_final(ResponseFinal { - rid: query.src_qid, - ext_qos: response::ext::QoSType::RESPONSE_FINAL, - ext_tstamp: None, - }); + .send_response_final(RoutingContext::with_expr( + ResponseFinal { + rid: query.src_qid, + ext_qos: response::ext::QoSType::RESPONSE_FINAL, + ext_tstamp: None, + }, + "".to_string(), + )); } } diff --git a/zenoh/src/net/routing/interceptor/access_control.rs b/zenoh/src/net/routing/interceptor/access_control.rs index 839f18bd07..6af064a878 100644 --- a/zenoh/src/net/routing/interceptor/access_control.rs +++ b/zenoh/src/net/routing/interceptor/access_control.rs @@ -360,8 +360,11 @@ impl InterceptorTrait for EgressAclEnforcer { return None; } } - NetworkBody::Response(Response { .. }) => { - if self.action(AclMessage::Reply, "Reply (egress)", key_expr?) == Permission::Deny { + NetworkBody::Response(Response { wire_expr, .. }) => { + // @TODO: Remove wire_expr usage when issue #1255 is implemented + if self.action(AclMessage::Reply, "Reply (egress)", wire_expr.as_str()) + == Permission::Deny + { return None; } } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index ce87d68ef0..6ee06f10fd 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -521,18 +521,18 @@ impl crate::net::primitives::EPrimitives for AdminSpace { } #[inline] - fn send_request(&self, msg: Request) { - (self as &dyn Primitives).send_request(msg) + fn send_request(&self, ctx: crate::net::routing::RoutingContext) { + (self as &dyn Primitives).send_request(ctx.msg) } #[inline] - fn send_response(&self, msg: Response) { - (self as &dyn Primitives).send_response(msg) + fn send_response(&self, ctx: crate::net::routing::RoutingContext) { + (self as &dyn Primitives).send_response(ctx.msg) } #[inline] - fn send_response_final(&self, msg: ResponseFinal) { - (self as &dyn Primitives).send_response_final(msg) + fn send_response_final(&self, ctx: crate::net::routing::RoutingContext) { + (self as &dyn Primitives).send_response_final(ctx.msg) } fn as_any(&self) -> &dyn std::any::Any { diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 5fd8a49261..8ef9294edc 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -567,15 +567,11 @@ impl EPrimitives for ClientPrimitives { *zlock!(self.data) = Some(msg.wire_expr.to_owned()); } - fn send_request(&self, msg: zenoh_protocol::network::Request) { - *zlock!(self.data) = Some(msg.wire_expr.to_owned()); - } + fn send_request(&self, _ctx: RoutingContext) {} - fn send_response(&self, msg: zenoh_protocol::network::Response) { - *zlock!(self.data) = Some(msg.wire_expr.to_owned()); - } + fn send_response(&self, _ctx: RoutingContext) {} - fn send_response_final(&self, _msg: zenoh_protocol::network::ResponseFinal) {} + fn send_response_final(&self, _ctx: RoutingContext) {} fn as_any(&self) -> &dyn std::any::Any { self diff --git a/zenoh/tests/authentication.rs b/zenoh/tests/authentication.rs index fcd448f7d1..8d1e404617 100644 --- a/zenoh/tests/authentication.rs +++ b/zenoh/tests/authentication.rs @@ -54,7 +54,7 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_tls(37448, false).await; + test_pub_sub_deny_then_allow_tls(37448).await; test_pub_sub_allow_then_deny_tls(37449).await; test_get_qbl_allow_then_deny_tls(37450).await; test_get_qbl_deny_then_allow_tls(37451).await; @@ -66,7 +66,7 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_quic(37452).await; + test_pub_sub_deny_then_allow_quic(37452, false).await; test_pub_sub_allow_then_deny_quic(37453).await; test_get_qbl_deny_then_allow_quic(37454).await; test_get_qbl_allow_then_deny_quic(37455).await; @@ -79,7 +79,7 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_tls(37456, true).await; + test_pub_sub_deny_then_allow_quic(37456, true).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -269,7 +269,7 @@ client2name:client2passwd"; Ok(()) } - async fn get_basic_router_config_tls(port: u16, lowlatency: bool) -> Config { + async fn get_basic_router_config_tls(port: u16) -> Config { let cert_path = TESTFILES_PATH.to_string_lossy(); let mut config = config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); @@ -313,16 +313,9 @@ client2name:client2passwd"; .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); - config.transport.unicast.set_lowlatency(lowlatency).unwrap(); - config - .transport - .unicast - .qos - .set_enabled(!lowlatency) - .unwrap(); config } - async fn get_basic_router_config_quic(port: u16) -> Config { + async fn get_basic_router_config_quic(port: u16, lowlatency: bool) -> Config { let cert_path = TESTFILES_PATH.to_string_lossy(); let mut config = config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); @@ -366,6 +359,13 @@ client2name:client2passwd"; .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); + config.transport.unicast.set_lowlatency(lowlatency).unwrap(); + config + .transport + .unicast + .qos + .set_enabled(!lowlatency) + .unwrap(); config } @@ -472,7 +472,7 @@ client2name:client2passwd"; config } - async fn get_client_sessions_tls(port: u16, lowlatency: bool) -> (Session, Session) { + async fn get_client_sessions_tls(port: u16) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); let mut config = config::client([format!("tls/127.0.0.1:{}", port) @@ -512,13 +512,6 @@ client2name:client2passwd"; .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); - config.transport.unicast.set_lowlatency(lowlatency).unwrap(); - config - .transport - .unicast - .qos - .set_enabled(!lowlatency) - .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); let mut config = config::client([format!("tls/127.0.0.1:{}", port) @@ -558,18 +551,11 @@ client2name:client2passwd"; .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); - config.transport.unicast.set_lowlatency(lowlatency).unwrap(); - config - .transport - .unicast - .qos - .set_enabled(!lowlatency) - .unwrap(); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } - async fn get_client_sessions_quic(port: u16) -> (Session, Session) { + async fn get_client_sessions_quic(port: u16, lowlatency: bool) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); let mut config = config::client([format!("quic/127.0.0.1:{}", port) @@ -609,6 +595,13 @@ client2name:client2passwd"; .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); + config.transport.unicast.set_lowlatency(lowlatency).unwrap(); + config + .transport + .unicast + .qos + .set_enabled(!lowlatency) + .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); let mut config = config::client([format!("quic/127.0.0.1:{}", port) .parse::() @@ -647,6 +640,13 @@ client2name:client2passwd"; .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); + config.transport.unicast.set_lowlatency(lowlatency).unwrap(); + config + .transport + .unicast + .qos + .set_enabled(!lowlatency) + .unwrap(); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } @@ -789,10 +789,10 @@ client2name:client2passwd"; ztimeout!(s02.close()).unwrap(); } - async fn test_pub_sub_deny_then_allow_tls(port: u16, lowlatency: bool) { + async fn test_pub_sub_deny_then_allow_tls(port: u16) { println!("test_pub_sub_deny_then_allow_tls"); - let mut config_router = get_basic_router_config_tls(port, lowlatency).await; + let mut config_router = get_basic_router_config_tls(port).await; config_router .insert_json5( @@ -835,7 +835,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_tls(port, lowlatency).await; + let (sub_session, pub_session) = get_client_sessions_tls(port).await; { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -861,7 +861,7 @@ client2name:client2passwd"; async fn test_pub_sub_allow_then_deny_tls(port: u16) { println!("test_pub_sub_allow_then_deny_tls"); - let mut config_router = get_basic_router_config_tls(port, false).await; + let mut config_router = get_basic_router_config_tls(port).await; config_router .insert_json5( "access_control", @@ -902,7 +902,7 @@ client2name:client2passwd"; println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_tls(port, false).await; + let (sub_session, pub_session) = get_client_sessions_tls(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -931,7 +931,7 @@ client2name:client2passwd"; async fn test_get_qbl_deny_then_allow_tls(port: u16) { println!("test_get_qbl_deny_then_allow_tls"); - let mut config_router = get_basic_router_config_tls(port, false).await; + let mut config_router = get_basic_router_config_tls(port).await; config_router .insert_json5( "access_control", @@ -975,7 +975,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions_tls(port, false).await; + let (get_session, qbl_session) = get_client_sessions_tls(port).await; { let mut received_value = String::new(); @@ -1017,7 +1017,7 @@ client2name:client2passwd"; async fn test_get_qbl_allow_then_deny_tls(port: u16) { println!("test_get_qbl_allow_then_deny_tls"); - let mut config_router = get_basic_router_config_tls(port, false).await; + let mut config_router = get_basic_router_config_tls(port).await; config_router .insert_json5( "access_control", @@ -1060,7 +1060,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions_tls(port, false).await; + let (get_session, qbl_session) = get_client_sessions_tls(port).await; { let mut received_value = String::new(); @@ -1099,10 +1099,10 @@ client2name:client2passwd"; close_router_session(session).await; } - async fn test_pub_sub_deny_then_allow_quic(port: u16) { + async fn test_pub_sub_deny_then_allow_quic(port: u16, lowlatency: bool) { println!("test_pub_sub_deny_then_allow_quic"); - let mut config_router = get_basic_router_config_quic(port).await; + let mut config_router = get_basic_router_config_quic(port, lowlatency).await; config_router .insert_json5( @@ -1145,7 +1145,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_quic(port).await; + let (sub_session, pub_session) = get_client_sessions_quic(port, lowlatency).await; { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -1173,7 +1173,7 @@ client2name:client2passwd"; async fn test_pub_sub_allow_then_deny_quic(port: u16) { println!("test_pub_sub_allow_then_deny_quic"); - let mut config_router = get_basic_router_config_quic(port).await; + let mut config_router = get_basic_router_config_quic(port, false).await; config_router .insert_json5( "access_control", @@ -1214,7 +1214,7 @@ client2name:client2passwd"; println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_quic(port).await; + let (sub_session, pub_session) = get_client_sessions_quic(port, false).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -1244,7 +1244,7 @@ client2name:client2passwd"; async fn test_get_qbl_deny_then_allow_quic(port: u16) { println!("test_get_qbl_deny_then_allow_quic"); - let mut config_router = get_basic_router_config_quic(port).await; + let mut config_router = get_basic_router_config_quic(port, false).await; config_router .insert_json5( "access_control", @@ -1288,7 +1288,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions_quic(port).await; + let (get_session, qbl_session) = get_client_sessions_quic(port, false).await; { let mut received_value = String::new(); @@ -1331,7 +1331,7 @@ client2name:client2passwd"; async fn test_get_qbl_allow_then_deny_quic(port: u16) { println!("test_get_qbl_allow_then_deny_quic"); - let mut config_router = get_basic_router_config_quic(port).await; + let mut config_router = get_basic_router_config_quic(port, false).await; config_router .insert_json5( "access_control", @@ -1374,7 +1374,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions_quic(port).await; + let (get_session, qbl_session) = get_client_sessions_quic(port, false).await; { let mut received_value = String::new();