From 1562a17b7a8a515e4a4ef98be7b23e9da47fbd48 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Thu, 28 Mar 2024 19:14:00 +0100 Subject: [PATCH 1/3] Protocol interest (#870) * Add InterestId in Declare message * Improve comments * Update commons/zenoh-protocol/src/network/declare.rs Co-authored-by: Mahmoud Mazouz * Update commons/zenoh-protocol/src/network/declare.rs Co-authored-by: Mahmoud Mazouz --------- Co-authored-by: Mahmoud Mazouz --- commons/zenoh-codec/src/network/declare.rs | 17 ++++++++++++++++- commons/zenoh-protocol/src/network/declare.rs | 17 +++++++++++------ zenoh/src/key_expr.rs | 1 + zenoh/src/net/routing/dispatcher/resource.rs | 1 + zenoh/src/net/routing/hat/client/pubsub.rs | 4 ++++ zenoh/src/net/routing/hat/client/queries.rs | 3 +++ .../net/routing/hat/linkstate_peer/pubsub.rs | 6 ++++++ .../net/routing/hat/linkstate_peer/queries.rs | 6 ++++++ zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 4 ++++ zenoh/src/net/routing/hat/p2p_peer/queries.rs | 3 +++ zenoh/src/net/routing/hat/router/pubsub.rs | 10 ++++++++++ zenoh/src/net/routing/hat/router/queries.rs | 10 ++++++++++ zenoh/src/net/runtime/adminspace.rs | 3 +++ zenoh/src/net/tests/tables.rs | 5 +++++ zenoh/src/session.rs | 7 +++++++ 15 files changed, 90 insertions(+), 7 deletions(-) diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index c81514ab3e..d7a25ea0a9 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -95,6 +95,7 @@ where fn write(self, writer: &mut W, x: &Declare) -> Self::Output { let Declare { + interest_id, ext_qos, ext_tstamp, ext_nodeid, @@ -103,6 +104,9 @@ where // Header let mut header = id::DECLARE; + if x.interest_id.is_some() { + header |= declare::flag::I; + } let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8); @@ -111,6 +115,11 @@ where } self.write(&mut *writer, header)?; + // Body + if let Some(interest_id) = interest_id { + self.write(&mut *writer, interest_id)?; + } + // Extensions if ext_qos != &declare::ext::QoSType::DEFAULT { n_exts -= 1; @@ -157,6 +166,11 @@ where return Err(DidntRead); } + let mut interest_id = None; + if imsg::has_flag(self.header, declare::flag::I) { + interest_id = Some(self.codec.read(&mut *reader)?); + } + // Extensions let mut ext_qos = declare::ext::QoSType::DEFAULT; let mut ext_tstamp = None; @@ -192,10 +206,11 @@ where let body: DeclareBody = self.codec.read(&mut *reader)?; Ok(Declare { - body, + interest_id, ext_qos, ext_tstamp, ext_nodeid, + body, }) } } diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index d41d8bf67f..10027259c2 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -25,20 +25,22 @@ pub use subscriber::*; pub use token::*; pub mod flag { - // pub const X: u8 = 1 << 5; // 0x20 Reserved - // pub const X: u8 = 1 << 6; // 0x40 Reserved + pub const I: u8 = 1 << 5; // 0x20 Interest if I==1 then the declare is in a response to an Interest with future==false + // pub const X: u8 = 1 << 6; // 0x40 Reserved pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow } /// Flags: -/// - X: Reserved +/// - I: Interest If I==1 then the declare is in a response to an Interest with future==false /// - X: Reserved /// - Z: Extension If Z==1 then at least one extension is present /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ -/// |Z|X|X| DECLARE | +/// |Z|X|I| DECLARE | /// +-+-+-+---------+ +/// ~interest_id:z32~ if I==1 +/// +---------------+ /// ~ [decl_exts] ~ if Z==1 /// +---------------+ /// ~ declaration ~ @@ -46,6 +48,7 @@ pub mod flag { /// #[derive(Debug, Clone, PartialEq, Eq)] pub struct Declare { + pub interest_id: Option, pub ext_qos: ext::QoSType, pub ext_tstamp: Option, pub ext_nodeid: ext::NodeIdType, @@ -132,16 +135,18 @@ impl Declare { let mut rng = rand::thread_rng(); - let body = DeclareBody::rand(); + let interest_id = rng.gen_bool(0.5).then_some(rng.gen::()); let ext_qos = ext::QoSType::rand(); let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand); let ext_nodeid = ext::NodeIdType::rand(); + let body = DeclareBody::rand(); Self { - body, + interest_id, ext_qos, ext_tstamp, ext_nodeid, + body, } } } diff --git a/zenoh/src/key_expr.rs b/zenoh/src/key_expr.rs index f340f24cf1..aaa1d13724 100644 --- a/zenoh/src/key_expr.rs +++ b/zenoh/src/key_expr.rs @@ -664,6 +664,7 @@ impl SyncResolve for KeyExprUndeclaration<'_> { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(zenoh_protocol::network::Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 0450dab38a..194b97fca8 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -452,6 +452,7 @@ impl Resource { .insert(expr_id, nonwild_prefix.clone()); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 290f90f95f..e85bb77bf9 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -53,6 +53,7 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -136,6 +137,7 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -169,6 +171,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -203,6 +206,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 2ac3f1b993..5c0bc5349b 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -93,6 +93,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -164,6 +165,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -414,6 +418,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -455,6 +460,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 9fba744a9c..150c12a632 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -126,6 +126,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -169,6 +170,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -337,6 +339,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -362,6 +365,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index a722176292..b495248788 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -53,6 +53,7 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -136,6 +137,7 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -169,6 +171,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -203,6 +206,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 38f77bec45..72c32b9217 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -93,6 +93,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -164,6 +165,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -408,6 +412,7 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc< if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -559,6 +564,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -600,6 +606,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -628,6 +635,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -766,6 +774,7 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: if forget { dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -791,6 +800,7 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: }; dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 61abaa7c55..99e787beb5 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -194,6 +194,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -247,6 +248,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -471,6 +473,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -496,6 +499,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -768,6 +775,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -866,6 +874,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links if forget { dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -891,6 +900,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 166ff16bd0..d460ee3f1c 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -276,6 +276,8 @@ impl AdminSpace { zlock!(admin.primitives).replace(primitives.clone()); primitives.send_declare(Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -287,6 +289,7 @@ impl AdminSpace { }); primitives.send_declare(Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 516bcd0109..4067f2ad8f 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -579,6 +579,7 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -606,6 +607,7 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -627,6 +629,7 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -654,6 +657,7 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -675,6 +679,7 @@ fn client_test() { Primitives::send_declare( primitives2.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index b9e20a4e68..addb757807 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -872,6 +872,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1084,6 +1085,7 @@ impl Session { // }; primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1140,6 +1142,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1191,6 +1194,7 @@ impl Session { distance: 0, }; primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1212,6 +1216,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1247,6 +1252,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1271,6 +1277,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, From 21fb0832d9cfa904bf787ef9d511572b5ce81755 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 29 Mar 2024 10:43:07 +0100 Subject: [PATCH 2/3] Protocol batchsize (#873) * Use BatchSize typedef instead of u16 * Use BatchSize typedef instead of u16 for vsock --- commons/zenoh-codec/src/core/zint.rs | 68 ++++++++++--------- commons/zenoh-protocol/src/transport/init.rs | 4 +- commons/zenoh-protocol/src/transport/join.rs | 2 +- commons/zenoh-protocol/src/transport/mod.rs | 1 + io/zenoh-link-commons/src/lib.rs | 3 +- io/zenoh-link-commons/src/multicast.rs | 4 +- io/zenoh-link-commons/src/unicast.rs | 7 +- io/zenoh-links/zenoh-link-quic/src/lib.rs | 13 ++-- io/zenoh-links/zenoh-link-quic/src/unicast.rs | 3 +- io/zenoh-links/zenoh-link-serial/src/lib.rs | 7 +- .../zenoh-link-serial/src/unicast.rs | 3 +- io/zenoh-links/zenoh-link-tcp/src/lib.rs | 5 +- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 3 +- io/zenoh-links/zenoh-link-tls/src/lib.rs | 13 ++-- io/zenoh-links/zenoh-link-tls/src/unicast.rs | 4 +- io/zenoh-links/zenoh-link-udp/src/lib.rs | 11 +-- .../zenoh-link-udp/src/multicast.rs | 3 +- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 3 +- .../zenoh-link-unixpipe/src/unix/unicast.rs | 5 +- .../zenoh-link-unixsock_stream/src/lib.rs | 5 +- .../zenoh-link-unixsock_stream/src/unicast.rs | 3 +- io/zenoh-links/zenoh-link-vsock/src/lib.rs | 4 +- .../zenoh-link-vsock/src/unicast.rs | 8 ++- io/zenoh-links/zenoh-link-ws/src/lib.rs | 5 +- io/zenoh-links/zenoh-link-ws/src/unicast.rs | 3 +- io/zenoh-transport/src/common/pipeline.rs | 12 ++-- io/zenoh-transport/src/manager.rs | 6 +- .../src/unicast/establishment/cookie.rs | 9 ++- 28 files changed, 125 insertions(+), 92 deletions(-) diff --git a/commons/zenoh-codec/src/core/zint.rs b/commons/zenoh-codec/src/core/zint.rs index 0daff7348b..d5160e2ee6 100644 --- a/commons/zenoh-codec/src/core/zint.rs +++ b/commons/zenoh-codec/src/core/zint.rs @@ -17,38 +17,42 @@ use zenoh_buffers::{ writer::{DidntWrite, Writer}, }; -const VLE_LEN: usize = 9; +const VLE_LEN_MAX: usize = vle_len(u64::MAX); + +const fn vle_len(x: u64) -> usize { + const B1: u64 = u64::MAX << 7; + const B2: u64 = u64::MAX << (7 * 2); + const B3: u64 = u64::MAX << (7 * 3); + const B4: u64 = u64::MAX << (7 * 4); + const B5: u64 = u64::MAX << (7 * 5); + const B6: u64 = u64::MAX << (7 * 6); + const B7: u64 = u64::MAX << (7 * 7); + const B8: u64 = u64::MAX << (7 * 8); + + if (x & B1) == 0 { + 1 + } else if (x & B2) == 0 { + 2 + } else if (x & B3) == 0 { + 3 + } else if (x & B4) == 0 { + 4 + } else if (x & B5) == 0 { + 5 + } else if (x & B6) == 0 { + 6 + } else if (x & B7) == 0 { + 7 + } else if (x & B8) == 0 { + 8 + } else { + 9 + } +} impl LCodec for Zenoh080 { fn w_len(self, x: u64) -> usize { - const B1: u64 = u64::MAX << 7; - const B2: u64 = u64::MAX << (7 * 2); - const B3: u64 = u64::MAX << (7 * 3); - const B4: u64 = u64::MAX << (7 * 4); - const B5: u64 = u64::MAX << (7 * 5); - const B6: u64 = u64::MAX << (7 * 6); - const B7: u64 = u64::MAX << (7 * 7); - const B8: u64 = u64::MAX << (7 * 8); - - if (x & B1) == 0 { - 1 - } else if (x & B2) == 0 { - 2 - } else if (x & B3) == 0 { - 3 - } else if (x & B4) == 0 { - 4 - } else if (x & B5) == 0 { - 5 - } else if (x & B6) == 0 { - 6 - } else if (x & B7) == 0 { - 7 - } else if (x & B8) == 0 { - 8 - } else { - 9 - } + vle_len(x) } } @@ -107,7 +111,7 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, mut x: u64) -> Self::Output { - writer.with_slot(VLE_LEN, move |buffer| { + writer.with_slot(VLE_LEN_MAX, move |buffer| { let mut len = 0; while (x & !0x7f_u64) != 0 { // SAFETY: buffer is guaranteed to be VLE_LEN long where VLE_LEN is @@ -122,7 +126,7 @@ where } // In case len == VLE_LEN then all the bits have already been written in the latest iteration. // Else we haven't written all the necessary bytes yet. - if len != VLE_LEN { + if len != VLE_LEN_MAX { // SAFETY: buffer is guaranteed to be VLE_LEN long where VLE_LEN is // the maximum number of bytes a VLE can take once encoded. // I.e.: x is shifted 7 bits to the right every iteration, @@ -151,7 +155,7 @@ where let mut v = 0; let mut i = 0; // 7 * VLE_LEN is beyond the maximum number of shift bits - while (b & 0x80_u8) != 0 && i != 7 * (VLE_LEN - 1) { + while (b & 0x80_u8) != 0 && i != 7 * (VLE_LEN_MAX - 1) { v |= ((b & 0x7f_u8) as u64) << i; b = reader.read_u8()?; i += 7; diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 1327288471..de517a353c 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -165,7 +165,7 @@ impl InitSyn { let whatami = WhatAmI::rand(); let zid = ZenohId::default(); let resolution = Resolution::rand(); - let batch_size: u16 = rng.gen(); + let batch_size: BatchSize = rng.gen(); let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -221,7 +221,7 @@ impl InitAck { } else { Resolution::rand() }; - let batch_size: u16 = rng.gen(); + let batch_size: BatchSize = rng.gen(); let cookie = ZSlice::rand(64); let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); diff --git a/commons/zenoh-protocol/src/transport/join.rs b/commons/zenoh-protocol/src/transport/join.rs index c5fbb98430..a5cf1422a6 100644 --- a/commons/zenoh-protocol/src/transport/join.rs +++ b/commons/zenoh-protocol/src/transport/join.rs @@ -141,7 +141,7 @@ impl Join { let whatami = WhatAmI::rand(); let zid = ZenohId::default(); let resolution = Resolution::rand(); - let batch_size: u16 = rng.gen(); + let batch_size: BatchSize = rng.gen(); let lease = if rng.gen_bool(0.5) { Duration::from_secs(rng.gen()) } else { diff --git a/commons/zenoh-protocol/src/transport/mod.rs b/commons/zenoh-protocol/src/transport/mod.rs index 1ea6fca144..e92860f441 100644 --- a/commons/zenoh-protocol/src/transport/mod.rs +++ b/commons/zenoh-protocol/src/transport/mod.rs @@ -39,6 +39,7 @@ use crate::network::NetworkMessage; /// the boundary of the serialized messages. The length is encoded as little-endian. /// In any case, the length of a message must not exceed 65_535 bytes. pub type BatchSize = u16; +pub type AtomicBatchSize = core::sync::atomic::AtomicU16; pub mod batch_size { use super::BatchSize; diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index f9ad7166ee..138726fd4f 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -32,6 +32,7 @@ pub use multicast::*; use serde::Serialize; pub use unicast::*; use zenoh_protocol::core::Locator; +use zenoh_protocol::transport::BatchSize; use zenoh_result::ZResult; /*************************************/ @@ -45,7 +46,7 @@ pub struct Link { pub src: Locator, pub dst: Locator, pub group: Option, - pub mtu: u16, + pub mtu: BatchSize, pub is_reliable: bool, pub is_streamed: bool, pub interfaces: Vec, diff --git a/io/zenoh-link-commons/src/multicast.rs b/io/zenoh-link-commons/src/multicast.rs index 65bc7195b6..ccfe6842c1 100644 --- a/io/zenoh-link-commons/src/multicast.rs +++ b/io/zenoh-link-commons/src/multicast.rs @@ -22,7 +22,7 @@ use zenoh_buffers::{reader::HasReader, writer::HasWriter}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_protocol::{ core::{EndPoint, Locator}, - transport::TransportMessage, + transport::{BatchSize, TransportMessage}, }; use zenoh_result::{zerror, ZResult}; @@ -44,7 +44,7 @@ pub struct LinkMulticast(pub Arc); #[async_trait] pub trait LinkMulticastTrait: Send + Sync { - fn get_mtu(&self) -> u16; + fn get_mtu(&self) -> BatchSize; fn get_src(&self) -> &Locator; fn get_dst(&self) -> &Locator; fn is_reliable(&self) -> bool; diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index fe87e70e94..c21f4a008c 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -19,7 +19,10 @@ use core::{ ops::Deref, }; use std::net::SocketAddr; -use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::{ + core::{EndPoint, Locator}, + transport::BatchSize, +}; use zenoh_result::ZResult; pub type LinkManagerUnicast = Arc; @@ -41,7 +44,7 @@ pub struct LinkUnicast(pub Arc); #[async_trait] pub trait LinkUnicastTrait: Send + Sync { - fn get_mtu(&self) -> u16; + fn get_mtu(&self) -> BatchSize; fn get_src(&self) -> &Locator; fn get_dst(&self) -> &Locator; fn is_reliable(&self) -> bool; diff --git a/io/zenoh-links/zenoh-link-quic/src/lib.rs b/io/zenoh-links/zenoh-link-quic/src/lib.rs index c6d7e16087..4bcabaf5b6 100644 --- a/io/zenoh-links/zenoh-link-quic/src/lib.rs +++ b/io/zenoh-links/zenoh-link-quic/src/lib.rs @@ -28,9 +28,12 @@ use std::net::SocketAddr; use zenoh_config::Config; use zenoh_core::zconfigurable; use zenoh_link_commons::{ConfigurationInspector, LocatorInspector}; -use zenoh_protocol::core::{ - endpoint::{Address, Parameters}, - Locator, +use zenoh_protocol::{ + core::{ + endpoint::{Address, Parameters}, + Locator, + }, + transport::BatchSize, }; use zenoh_result::{bail, zerror, ZResult}; @@ -47,7 +50,7 @@ pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; // adopted in Zenoh and the usage of 16 bits in Zenoh to encode the // payload length in byte-streamed, the QUIC MTU is constrained to // 2^16 - 1 bytes (i.e., 65535). -const QUIC_MAX_MTU: u16 = u16::MAX; +const QUIC_MAX_MTU: BatchSize = BatchSize::MAX; pub const QUIC_LOCATOR_PREFIX: &str = "quic"; #[derive(Default, Clone, Copy, Debug)] @@ -137,7 +140,7 @@ impl ConfigurationInspector for QuicConfigurator { zconfigurable! { // Default MTU (QUIC PDU) in bytes. - static ref QUIC_DEFAULT_MTU: u16 = QUIC_MAX_MTU; + static ref QUIC_DEFAULT_MTU: BatchSize = QUIC_MAX_MTU; // The LINGER option causes the shutdown() call to block until (1) all application data is delivered // to the remote end or (2) a timeout expires. The timeout is expressed in seconds. // More info on the LINGER option and its dynamics can be found at: diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 33953d666d..14a01861ca 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -34,6 +34,7 @@ use zenoh_link_commons::{ ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{bail, zerror, ZError, ZResult}; pub struct LinkUnicastQuic { @@ -135,7 +136,7 @@ impl LinkUnicastTrait for LinkUnicastQuic { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *QUIC_DEFAULT_MTU } diff --git a/io/zenoh-links/zenoh-link-serial/src/lib.rs b/io/zenoh-links/zenoh-link-serial/src/lib.rs index fb4d7fcc12..f7b0b7afeb 100644 --- a/io/zenoh-links/zenoh-link-serial/src/lib.rs +++ b/io/zenoh-links/zenoh-link-serial/src/lib.rs @@ -25,10 +25,11 @@ pub use unicast::*; use zenoh_core::zconfigurable; use zenoh_link_commons::LocatorInspector; use zenoh_protocol::core::{endpoint::Address, EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::ZResult; // Maximum MTU (Serial PDU) in bytes. -const SERIAL_MAX_MTU: u16 = z_serial::MAX_MTU as u16; +const SERIAL_MAX_MTU: BatchSize = z_serial::MAX_MTU as BatchSize; const DEFAULT_BAUDRATE: u32 = 9_600; @@ -36,11 +37,11 @@ const DEFAULT_EXCLUSIVE: bool = true; pub const SERIAL_LOCATOR_PREFIX: &str = "serial"; -const SERIAL_MTU_LIMIT: u16 = SERIAL_MAX_MTU; +const SERIAL_MTU_LIMIT: BatchSize = SERIAL_MAX_MTU; zconfigurable! { // Default MTU (UDP PDU) in bytes. - static ref SERIAL_DEFAULT_MTU: u16 = SERIAL_MTU_LIMIT; + static ref SERIAL_DEFAULT_MTU: BatchSize = SERIAL_MTU_LIMIT; // Amount of time in microseconds to throttle the accept loop upon an error. // Default set to 100 ms. static ref SERIAL_ACCEPT_THROTTLE_TIME: u64 = 100_000; diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 0efa40ee90..0a5bea3c18 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -30,6 +30,7 @@ use zenoh_link_commons::{ NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{zerror, ZResult}; use z_serial::ZSerial; @@ -177,7 +178,7 @@ impl LinkUnicastTrait for LinkUnicastSerial { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *SERIAL_DEFAULT_MTU } diff --git a/io/zenoh-links/zenoh-link-tcp/src/lib.rs b/io/zenoh-links/zenoh-link-tcp/src/lib.rs index 1a7d6ae705..0b075d9bf8 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/lib.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/lib.rs @@ -22,6 +22,7 @@ use std::net::SocketAddr; use zenoh_core::zconfigurable; use zenoh_link_commons::LocatorInspector; use zenoh_protocol::core::{endpoint::Address, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{zerror, ZResult}; mod unicast; @@ -33,7 +34,7 @@ pub use unicast::*; // adopted in Zenoh and the usage of 16 bits in Zenoh to encode the // payload length in byte-streamed, the TCP MTU is constrained to // 2^16 - 1 bytes (i.e., 65535). -const TCP_MAX_MTU: u16 = u16::MAX; +const TCP_MAX_MTU: BatchSize = BatchSize::MAX; pub const TCP_LOCATOR_PREFIX: &str = "tcp"; @@ -52,7 +53,7 @@ impl LocatorInspector for TcpLocatorInspector { zconfigurable! { // Default MTU (TCP PDU) in bytes. - static ref TCP_DEFAULT_MTU: u16 = TCP_MAX_MTU; + static ref TCP_DEFAULT_MTU: BatchSize = TCP_MAX_MTU; // The LINGER option causes the shutdown() call to block until (1) all application data is delivered // to the remote end or (2) a timeout expires. The timeout is expressed in seconds. // More info on the LINGER option and its dynamics can be found at: diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 7137ac0212..aaadcf3c23 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -25,6 +25,7 @@ use zenoh_link_commons::{ ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; use super::{ @@ -145,7 +146,7 @@ impl LinkUnicastTrait for LinkUnicastTcp { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *TCP_DEFAULT_MTU } diff --git a/io/zenoh-links/zenoh-link-tls/src/lib.rs b/io/zenoh-links/zenoh-link-tls/src/lib.rs index 95d59104b4..7faebb4cd9 100644 --- a/io/zenoh-links/zenoh-link-tls/src/lib.rs +++ b/io/zenoh-links/zenoh-link-tls/src/lib.rs @@ -30,9 +30,12 @@ use std::{convert::TryFrom, net::SocketAddr}; use zenoh_config::Config; use zenoh_core::zconfigurable; use zenoh_link_commons::{ConfigurationInspector, LocatorInspector}; -use zenoh_protocol::core::{ - endpoint::{self, Address}, - Locator, +use zenoh_protocol::{ + core::{ + endpoint::{self, Address}, + Locator, + }, + transport::BatchSize, }; use zenoh_result::{bail, zerror, ZResult}; @@ -45,7 +48,7 @@ pub use unicast::*; // adopted in Zenoh and the usage of 16 bits in Zenoh to encode the // payload length in byte-streamed, the TLS MTU is constrained to // 2^16 - 1 bytes (i.e., 65535). -const TLS_MAX_MTU: u16 = u16::MAX; +const TLS_MAX_MTU: BatchSize = BatchSize::MAX; pub const TLS_LOCATOR_PREFIX: &str = "tls"; #[derive(Default, Clone, Copy)] @@ -172,7 +175,7 @@ impl ConfigurationInspector for TlsConfigurator { zconfigurable! { // Default MTU (TLS PDU) in bytes. - static ref TLS_DEFAULT_MTU: u16 = TLS_MAX_MTU; + static ref TLS_DEFAULT_MTU: BatchSize = TLS_MAX_MTU; // The LINGER option causes the shutdown() call to block until (1) all application data is delivered // to the remote end or (2) a timeout expires. The timeout is expressed in seconds. // More info on the LINGER option and its dynamics can be found at: diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 7da711161e..a58e7372dd 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -42,8 +42,8 @@ use zenoh_link_commons::{ get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; -use zenoh_protocol::core::endpoint::Config; use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::{core::endpoint::Config, transport::BatchSize}; use zenoh_result::{bail, zerror, ZError, ZResult}; pub struct LinkUnicastTls { @@ -180,7 +180,7 @@ impl LinkUnicastTrait for LinkUnicastTls { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *TLS_DEFAULT_MTU } diff --git a/io/zenoh-links/zenoh-link-udp/src/lib.rs b/io/zenoh-links/zenoh-link-udp/src/lib.rs index 91d02cc13d..86db845d8f 100644 --- a/io/zenoh-links/zenoh-link-udp/src/lib.rs +++ b/io/zenoh-links/zenoh-link-udp/src/lib.rs @@ -27,6 +27,7 @@ pub use unicast::*; use zenoh_core::zconfigurable; use zenoh_link_commons::LocatorInspector; use zenoh_protocol::core::{endpoint::Address, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{zerror, ZResult}; // NOTE: In case of using UDP in high-throughput scenarios, it is recommended to set the @@ -44,24 +45,24 @@ use zenoh_result::{zerror, ZResult}; // Although in IPv6 it is possible to have UDP datagrams of size greater than 65,535 bytes via // IPv6 Jumbograms, its usage in Zenoh is discouraged unless the consequences are very well // understood. -const UDP_MAX_MTU: u16 = 65_507; +const UDP_MAX_MTU: BatchSize = 65_507; pub const UDP_LOCATOR_PREFIX: &str = "udp"; #[cfg(any(target_os = "linux", target_os = "windows"))] // Linux default value of a maximum datagram size is set to UDP MAX MTU. -const UDP_MTU_LIMIT: u16 = UDP_MAX_MTU; +const UDP_MTU_LIMIT: BatchSize = UDP_MAX_MTU; #[cfg(target_os = "macos")] // Mac OS X default value of a maximum datagram size is set to 9216 bytes. -const UDP_MTU_LIMIT: u16 = 9_216; +const UDP_MTU_LIMIT: BatchSize = 9_216; #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] -const UDP_MTU_LIMIT: u16 = 8_192; +const UDP_MTU_LIMIT: BatchSize = 8_192; zconfigurable! { // Default MTU (UDP PDU) in bytes. - static ref UDP_DEFAULT_MTU: u16 = UDP_MTU_LIMIT; + static ref UDP_DEFAULT_MTU: BatchSize = UDP_MTU_LIMIT; // Amount of time in microseconds to throttle the accept loop upon an error. // Default set to 100 ms. static ref UDP_ACCEPT_THROTTLE_TIME: u64 = 100_000; diff --git a/io/zenoh-links/zenoh-link-udp/src/multicast.rs b/io/zenoh-links/zenoh-link-udp/src/multicast.rs index bc894bd296..a6e7977052 100644 --- a/io/zenoh-links/zenoh-link-udp/src/multicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/multicast.rs @@ -21,6 +21,7 @@ use std::{borrow::Cow, fmt}; use tokio::net::UdpSocket; use zenoh_link_commons::{LinkManagerMulticastTrait, LinkMulticast, LinkMulticastTrait}; use zenoh_protocol::core::{Config, EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; pub struct LinkMulticastUdp { @@ -119,7 +120,7 @@ impl LinkMulticastTrait for LinkMulticastUdp { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *UDP_DEFAULT_MTU } diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 1cd4a0b1ec..5021969bfa 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -30,6 +30,7 @@ use zenoh_link_commons::{ LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; use zenoh_sync::Mvar; @@ -200,7 +201,7 @@ impl LinkUnicastTrait for LinkUnicastUdp { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *UDP_DEFAULT_MTU } diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 0a0aebe730..3026d4e4b0 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -33,6 +33,7 @@ use tokio::io::Interest; use tokio_util::sync::CancellationToken; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_runtime::ZRuntime; use unix_named_pipe::{create, open_write}; @@ -45,7 +46,7 @@ use zenoh_result::{bail, ZResult}; use super::FILE_ACCESS_MASK; -const LINUX_PIPE_MAX_MTU: u16 = 65_535; +const LINUX_PIPE_MAX_MTU: BatchSize = BatchSize::MAX; const LINUX_PIPE_DEDICATE_TRIES: usize = 100; static PIPE_INVITATION: &[u8] = &[0xDE, 0xAD, 0xBE, 0xEF]; @@ -498,7 +499,7 @@ impl LinkUnicastTrait for UnicastPipe { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { LINUX_PIPE_MAX_MTU } diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/lib.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/lib.rs index b6c180cd8d..ce067c1aa2 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/lib.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/lib.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use zenoh_core::zconfigurable; use zenoh_link_commons::LocatorInspector; use zenoh_protocol::core::{endpoint::Address, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::ZResult; #[cfg(target_family = "unix")] mod unicast; @@ -33,13 +34,13 @@ pub use unicast::*; // adopted in Zenoh and the usage of 16 bits in Zenoh to encode the // payload length in byte-streamed, the UNIXSOCKSTREAM MTU is constrained to // 2^16 - 1 bytes (i.e., 65535). -const UNIXSOCKSTREAM_MAX_MTU: u16 = u16::MAX; +const UNIXSOCKSTREAM_MAX_MTU: BatchSize = BatchSize::MAX; pub const UNIXSOCKSTREAM_LOCATOR_PREFIX: &str = "unixsock-stream"; zconfigurable! { // Default MTU (UNIXSOCKSTREAM PDU) in bytes. - static ref UNIXSOCKSTREAM_DEFAULT_MTU: u16 = UNIXSOCKSTREAM_MAX_MTU; + static ref UNIXSOCKSTREAM_DEFAULT_MTU: BatchSize = UNIXSOCKSTREAM_MAX_MTU; // Amount of time in microseconds to throttle the accept loop upon an error. // Default set to 100 ms. static ref UNIXSOCKSTREAM_ACCEPT_THROTTLE_TIME: u64 = 100_000; diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index 53441ab89c..a961c1aebb 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -32,6 +32,7 @@ use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{zerror, ZResult}; use super::{get_unix_path_as_string, UNIXSOCKSTREAM_DEFAULT_MTU, UNIXSOCKSTREAM_LOCATOR_PREFIX}; @@ -119,7 +120,7 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *UNIXSOCKSTREAM_DEFAULT_MTU } diff --git a/io/zenoh-links/zenoh-link-vsock/src/lib.rs b/io/zenoh-links/zenoh-link-vsock/src/lib.rs index 7834050796..d58250fed3 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/lib.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/lib.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use zenoh_core::zconfigurable; use zenoh_link_commons::LocatorInspector; -use zenoh_protocol::core::Locator; +use zenoh_protocol::{core::Locator, transport::BatchSize}; use zenoh_result::ZResult; #[cfg(target_os = "linux")] @@ -47,7 +47,7 @@ impl LocatorInspector for VsockLocatorInspector { zconfigurable! { // Default MTU in bytes. - static ref VSOCK_DEFAULT_MTU: u16 = u16::MAX; + static ref VSOCK_DEFAULT_MTU: BatchSize = BatchSize::MAX; // Amount of time in microseconds to throttle the accept loop upon an error. // Default set to 100 ms. static ref VSOCK_ACCEPT_THROTTLE_TIME: u64 = 100_000; diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs index ced7b9dc15..59efa6f0e3 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -27,8 +27,10 @@ use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; -use zenoh_protocol::core::endpoint::Address; -use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::{ + core::{endpoint::Address, EndPoint, Locator}, + transport::BatchSize, +}; use zenoh_result::{bail, zerror, ZResult}; use super::{VSOCK_ACCEPT_THROTTLE_TIME, VSOCK_DEFAULT_MTU, VSOCK_LOCATOR_PREFIX}; @@ -170,7 +172,7 @@ impl LinkUnicastTrait for LinkUnicastVsock { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *VSOCK_DEFAULT_MTU } diff --git a/io/zenoh-links/zenoh-link-ws/src/lib.rs b/io/zenoh-links/zenoh-link-ws/src/lib.rs index f68a20d15d..d165b480a9 100644 --- a/io/zenoh-links/zenoh-link-ws/src/lib.rs +++ b/io/zenoh-links/zenoh-link-ws/src/lib.rs @@ -23,6 +23,7 @@ use url::Url; use zenoh_core::zconfigurable; use zenoh_link_commons::LocatorInspector; use zenoh_protocol::core::{endpoint::Address, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{bail, ZResult}; mod unicast; pub use unicast::*; @@ -33,7 +34,7 @@ pub use unicast::*; // adopted in Zenoh and the usage of 16 bits in Zenoh to encode the // payload length in byte-streamed, the TCP MTU is constrained to // 2^16 - 1 bytes (i.e., 65535). -const WS_MAX_MTU: u16 = u16::MAX; +const WS_MAX_MTU: BatchSize = BatchSize::MAX; pub const WS_LOCATOR_PREFIX: &str = "ws"; @@ -51,7 +52,7 @@ impl LocatorInspector for WsLocatorInspector { zconfigurable! { // Default MTU (TCP PDU) in bytes. - static ref WS_DEFAULT_MTU: u16 = WS_MAX_MTU; + static ref WS_DEFAULT_MTU: BatchSize = WS_MAX_MTU; // Amount of time in microseconds to throttle the accept loop upon an error. // Default set to 100 ms. static ref TCP_ACCEPT_THROTTLE_TIME: u64 = 100_000; diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 6a0cf64e6e..acf568f78c 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -34,6 +34,7 @@ use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_protocol::transport::BatchSize; use zenoh_result::{bail, zerror, ZResult}; use super::{get_ws_addr, get_ws_url, TCP_ACCEPT_THROTTLE_TIME, WS_DEFAULT_MTU, WS_LOCATOR_PREFIX}; @@ -200,7 +201,7 @@ impl LinkUnicastTrait for LinkUnicastWs { } #[inline(always)] - fn get_mtu(&self) -> u16 { + fn get_mtu(&self) -> BatchSize { *WS_DEFAULT_MTU } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index fb95d709db..b74fa2990c 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -22,7 +22,7 @@ use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use std::{ - sync::atomic::{AtomicBool, AtomicU16, Ordering}, + sync::atomic::{AtomicBool, Ordering}, time::Instant, }; use zenoh_buffers::{ @@ -40,7 +40,7 @@ use zenoh_protocol::{ transport::{ fragment::FragmentHeader, frame::{self, FrameHeader}, - BatchSize, TransportMessage, + AtomicBatchSize, BatchSize, TransportMessage, }, }; @@ -75,7 +75,7 @@ impl StageInRefill { struct StageInOut { n_out_w: Sender<()>, s_out_w: RingBufferWriter, - bytes: Arc, + bytes: Arc, backoff: Arc, } @@ -355,12 +355,12 @@ enum Pull { struct Backoff { retry_time: NanoSeconds, last_bytes: BatchSize, - bytes: Arc, + bytes: Arc, backoff: Arc, } impl Backoff { - fn new(bytes: Arc, backoff: Arc) -> Self { + fn new(bytes: Arc, backoff: Arc) -> Self { Self { retry_time: 0, last_bytes: 0, @@ -552,7 +552,7 @@ impl TransmissionPipeline { // This is a SPSC ring buffer let (s_out_w, s_out_r) = RingBuffer::::init(); let current = Arc::new(Mutex::new(None)); - let bytes = Arc::new(AtomicU16::new(0)); + let bytes = Arc::new(AtomicBatchSize::new(0)); let backoff = Arc::new(AtomicBool::new(false)); stage_in.push(Mutex::new(StageIn { diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index f16a68cfba..2d7961ed2b 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -93,7 +93,7 @@ pub struct TransportManagerConfig { pub zid: ZenohId, pub whatami: WhatAmI, pub resolution: Resolution, - pub batch_size: u16, + pub batch_size: BatchSize, pub wait_before_drop: Duration, pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, @@ -122,7 +122,7 @@ pub struct TransportManagerBuilder { zid: ZenohId, whatami: WhatAmI, resolution: Resolution, - batch_size: u16, + batch_size: BatchSize, wait_before_drop: Duration, queue_size: QueueSizeConf, queue_backoff: Duration, @@ -151,7 +151,7 @@ impl TransportManagerBuilder { self } - pub fn batch_size(mut self, batch_size: u16) -> Self { + pub fn batch_size(mut self, batch_size: BatchSize) -> Self { self.batch_size = batch_size; self } diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 0db9e1c93a..6f0295601c 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -19,14 +19,17 @@ use zenoh_buffers::{ }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_crypto::{BlockCipher, PseudoRng}; -use zenoh_protocol::core::{Resolution, WhatAmI, ZenohId}; +use zenoh_protocol::{ + core::{Resolution, WhatAmI, ZenohId}, + transport::BatchSize, +}; #[derive(Debug, PartialEq)] pub(crate) struct Cookie { pub(crate) zid: ZenohId, pub(crate) whatami: WhatAmI, pub(crate) resolution: Resolution, - pub(crate) batch_size: u16, + pub(crate) batch_size: BatchSize, pub(crate) nonce: u64, // Extensions pub(crate) ext_qos: ext::qos::StateAccept, @@ -82,7 +85,7 @@ where let whatami = WhatAmI::try_from(wai).map_err(|_| DidntRead)?; let resolution: u8 = self.read(&mut *reader)?; let resolution = Resolution::from(resolution); - let batch_size: u16 = self.read(&mut *reader)?; + let batch_size: BatchSize = self.read(&mut *reader)?; let nonce: u64 = self.read(&mut *reader)?; // Extensions let ext_qos: ext::qos::StateAccept = self.read(&mut *reader)?; From 312c03a2a79e0d8a06904008331148efd2a5475a Mon Sep 17 00:00:00 2001 From: DenisBiryukov91 <155981813+DenisBiryukov91@users.noreply.github.com> Date: Fri, 29 Mar 2024 16:57:52 +0100 Subject: [PATCH 3/3] Query.reply and reply_del, now accept TryIntoKeyExpr instead of IntoKeyExpr (#878) --- zenoh/src/queryable.rs | 41 ++++++++++++++++++++++------------------- zenoh/tests/routing.rs | 2 +- zenoh/tests/session.rs | 5 +---- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 599c0e13be..58589bfe8f 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -111,7 +111,7 @@ impl Query { #[inline(always)] #[cfg(feature = "unstable")] #[doc(hidden)] - pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_> { + pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_, 'static> { let Sample { key_expr, payload, @@ -126,7 +126,7 @@ impl Query { } = sample; ReplyBuilder { query: self, - key_expr, + key_expr: Ok(key_expr), payload, kind, encoding, @@ -145,18 +145,19 @@ impl Query { /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]), /// replying on a disjoint key expression will result in an error when resolving the reply. #[inline(always)] - pub fn reply( + pub fn reply<'b, TryIntoKeyExpr, IntoPayload>( &self, - key_expr: IntoKeyExpr, + key_expr: TryIntoKeyExpr, payload: IntoPayload, - ) -> ReplyBuilder<'_> + ) -> ReplyBuilder<'_, 'b> where - IntoKeyExpr: Into>, + TryIntoKeyExpr: TryInto>, + >>::Error: Into, IntoPayload: Into, { ReplyBuilder { query: self, - key_expr: key_expr.into(), + key_expr: key_expr.try_into().map_err(Into::into), payload: payload.into(), kind: SampleKind::Put, timestamp: None, @@ -187,13 +188,14 @@ impl Query { /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]), /// replying on a disjoint key expression will result in an error when resolving the reply. #[inline(always)] - pub fn reply_del(&self, key_expr: IntoKeyExpr) -> ReplyBuilder<'_> + pub fn reply_del<'b, TryIntoKeyExpr>(&self, key_expr: TryIntoKeyExpr) -> ReplyBuilder<'_, 'b> where - IntoKeyExpr: Into>, + TryIntoKeyExpr: TryInto>, + >>::Error: Into, { ReplyBuilder { query: self, - key_expr: key_expr.into(), + key_expr: key_expr.try_into().map_err(Into::into), payload: Payload::empty(), kind: SampleKind::Delete, timestamp: None, @@ -248,9 +250,9 @@ impl fmt::Display for Query { /// A builder returned by [`Query::reply()`](Query::reply) or [`Query::reply()`](Query::reply). #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug)] -pub struct ReplyBuilder<'a> { +pub struct ReplyBuilder<'a, 'b> { query: &'a Query, - key_expr: KeyExpr<'static>, + key_expr: ZResult>, payload: Payload, kind: SampleKind, encoding: Encoding, @@ -270,7 +272,7 @@ pub struct ReplyErrBuilder<'a> { value: Value, } -impl<'a> ReplyBuilder<'a> { +impl<'a, 'b> ReplyBuilder<'a, 'b> { #[zenoh_macros::unstable] pub fn with_attachment(mut self, attachment: Attachment) -> Self { self.attachment = Some(attachment); @@ -292,16 +294,17 @@ impl<'a> ReplyBuilder<'a> { } } -impl<'a> Resolvable for ReplyBuilder<'a> { +impl<'a, 'b> Resolvable for ReplyBuilder<'a, 'b> { type To = ZResult<()>; } -impl SyncResolve for ReplyBuilder<'_> { +impl<'a, 'b> SyncResolve for ReplyBuilder<'a, 'b> { fn res_sync(self) -> ::To { + let key_expr = self.key_expr?; if !self.query._accepts_any_replies().unwrap_or(false) - && !self.query.key_expr().intersects(&self.key_expr) + && !self.query.key_expr().intersects(&key_expr) { - bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", self.key_expr, self.query.key_expr()) + bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", &key_expr, self.query.key_expr()) } #[allow(unused_mut)] // will be unused if feature = "unstable" is not enabled let mut ext_sinfo = None; @@ -318,7 +321,7 @@ impl SyncResolve for ReplyBuilder<'_> { rid: self.query.inner.qid, wire_expr: WireExpr { scope: 0, - suffix: std::borrow::Cow::Owned(self.key_expr.into()), + suffix: std::borrow::Cow::Owned(key_expr.into()), mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { @@ -360,7 +363,7 @@ impl SyncResolve for ReplyBuilder<'_> { } } -impl<'a> AsyncResolve for ReplyBuilder<'a> { +impl<'a, 'b> AsyncResolve for ReplyBuilder<'a, 'b> { type Future = Ready; fn res_async(self) -> Self::Future { diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index c34d06690a..b90f0f568f 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -137,7 +137,7 @@ impl Task { tokio::select! { _ = token.cancelled() => break, query = queryable.recv_async() => { - query?.reply(KeyExpr::try_from(ke.to_owned())?, payload.clone()).res_async().await?; + query?.reply(ke.to_owned(), payload.clone()).res_async().await?; }, } } diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 5e86499bc7..8c2d2e9937 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -164,10 +164,7 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re "ok_del" => { tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(async { - ztimeout!(query - .reply_del(KeyExpr::try_from(key_expr).unwrap()) - .res_async()) - .unwrap() + ztimeout!(query.reply_del(key_expr).res_async()).unwrap() }) }); }