diff --git a/Cargo.lock b/Cargo.lock index e945a9e637..a18f116f79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4659,6 +4659,7 @@ dependencies = [ "async-std", "async-trait", "flume", + "lz4_flex", "serde", "typenum", "zenoh-buffers", @@ -4999,6 +5000,7 @@ dependencies = [ "zenoh-core", "zenoh-crypto", "zenoh-link", + "zenoh-link-commons", "zenoh-protocol", "zenoh-result", "zenoh-shm", diff --git a/commons/zenoh-buffers/src/bbuf.rs b/commons/zenoh-buffers/src/bbuf.rs index bdb9e9a056..6c02440603 100644 --- a/commons/zenoh-buffers/src/bbuf.rs +++ b/commons/zenoh-buffers/src/bbuf.rs @@ -15,8 +15,9 @@ use crate::{ reader::HasReader, vec, writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer}, + ZSlice, }; -use alloc::boxed::Box; +use alloc::{boxed::Box, sync::Arc}; use core::num::NonZeroUsize; #[derive(Clone, Debug, PartialEq, Eq)] @@ -152,6 +153,19 @@ impl<'a> HasReader for &'a BBuf { } } +// From impls +impl From for ZSlice { + fn from(value: BBuf) -> Self { + ZSlice { + buf: Arc::new(value.buffer), + start: 0, + end: value.len, + #[cfg(feature = "shared-memory")] + kind: ZSliceKind::Raw, + } + } +} + #[cfg(feature = "test")] impl BBuf { pub fn rand(len: usize) -> Self { diff --git a/commons/zenoh-buffers/src/lib.rs b/commons/zenoh-buffers/src/lib.rs index 718f486def..487409ce30 100644 --- a/commons/zenoh-buffers/src/lib.rs +++ b/commons/zenoh-buffers/src/lib.rs @@ -100,6 +100,7 @@ pub mod writer { where F: FnOnce(&mut [u8]) -> usize; } + pub trait BacktrackableWriter: Writer { type Mark; diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index db37c8fc03..5f98c77e5b 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -37,61 +37,80 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &InitSyn) -> Self::Output { + let InitSyn { + version, + whatami, + zid, + resolution, + batch_size, + ext_qos, + ext_shm, + ext_auth, + ext_mlink, + ext_lowlatency, + ext_compression, + } = x; + // Header let mut header = id::INIT; - if x.resolution != Resolution::default() || x.batch_size != batch_size::UNICAST { + if *resolution != Resolution::default() || *batch_size != batch_size::UNICAST { header |= flag::S; } - let mut n_exts = (x.ext_qos.is_some() as u8) - + (x.ext_shm.is_some() as u8) - + (x.ext_auth.is_some() as u8) - + (x.ext_mlink.is_some() as u8) - + (x.ext_lowlatency.is_some() as u8); + let mut n_exts = (ext_qos.is_some() as u8) + + (ext_shm.is_some() as u8) + + (ext_auth.is_some() as u8) + + (ext_mlink.is_some() as u8) + + (ext_lowlatency.is_some() as u8) + + (ext_compression.is_some() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.version)?; + self.write(&mut *writer, version)?; - let whatami: u8 = match x.whatami { + let whatami: u8 = match whatami { WhatAmI::Router => 0b00, WhatAmI::Peer => 0b01, WhatAmI::Client => 0b10, }; - let flags: u8 = ((x.zid.size() as u8 - 1) << 4) | whatami; + let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami; self.write(&mut *writer, flags)?; - let lodec = Zenoh080Length::new(x.zid.size()); - lodec.write(&mut *writer, &x.zid)?; + let lodec = Zenoh080Length::new(zid.size()); + lodec.write(&mut *writer, zid)?; if imsg::has_flag(header, flag::S) { - self.write(&mut *writer, x.resolution.as_u8())?; - self.write(&mut *writer, x.batch_size.to_le_bytes())?; + self.write(&mut *writer, resolution.as_u8())?; + self.write(&mut *writer, batch_size.to_le_bytes())?; } // Extensions - if let Some(qos) = x.ext_qos.as_ref() { + if let Some(qos) = ext_qos.as_ref() { n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } - if let Some(shm) = x.ext_shm.as_ref() { + if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } - if let Some(auth) = x.ext_auth.as_ref() { + if let Some(auth) = ext_auth.as_ref() { n_exts -= 1; self.write(&mut *writer, (auth, n_exts != 0))?; } - if let Some(mlink) = x.ext_mlink.as_ref() { + if let Some(mlink) = ext_mlink.as_ref() { n_exts -= 1; self.write(&mut *writer, (mlink, n_exts != 0))?; } - if let Some(lowlatency) = x.ext_lowlatency.as_ref() { + if let Some(lowlatency) = ext_lowlatency.as_ref() { n_exts -= 1; self.write(&mut *writer, (lowlatency, n_exts != 0))?; } + if let Some(compression) = ext_compression.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (compression, n_exts != 0))?; + } Ok(()) } @@ -150,6 +169,7 @@ where let mut ext_auth = None; let mut ext_mlink = None; let mut ext_lowlatency = None; + let mut ext_compression = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -181,6 +201,11 @@ where ext_lowlatency = Some(q); has_ext = ext; } + ext::Compression::ID => { + let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?; + ext_compression = Some(q); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitSyn", ext)?; } @@ -198,6 +223,7 @@ where ext_auth, ext_mlink, ext_lowlatency, + ext_compression, }) } } @@ -210,64 +236,84 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &InitAck) -> Self::Output { + let InitAck { + version, + whatami, + zid, + resolution, + batch_size, + cookie, + ext_qos, + ext_shm, + ext_auth, + ext_mlink, + ext_lowlatency, + ext_compression, + } = x; + // Header let mut header = id::INIT | flag::A; - if x.resolution != Resolution::default() || x.batch_size != batch_size::UNICAST { + if *resolution != Resolution::default() || *batch_size != batch_size::UNICAST { header |= flag::S; } - let mut n_exts = (x.ext_qos.is_some() as u8) - + (x.ext_shm.is_some() as u8) - + (x.ext_auth.is_some() as u8) - + (x.ext_mlink.is_some() as u8) - + (x.ext_lowlatency.is_some() as u8); + let mut n_exts = (ext_qos.is_some() as u8) + + (ext_shm.is_some() as u8) + + (ext_auth.is_some() as u8) + + (ext_mlink.is_some() as u8) + + (ext_lowlatency.is_some() as u8) + + (ext_compression.is_some() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.version)?; + self.write(&mut *writer, version)?; - let whatami: u8 = match x.whatami { + let whatami: u8 = match whatami { WhatAmI::Router => 0b00, WhatAmI::Peer => 0b01, WhatAmI::Client => 0b10, }; - let flags: u8 = ((x.zid.size() as u8 - 1) << 4) | whatami; + let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami; self.write(&mut *writer, flags)?; - let lodec = Zenoh080Length::new(x.zid.size()); - lodec.write(&mut *writer, &x.zid)?; + let lodec = Zenoh080Length::new(zid.size()); + lodec.write(&mut *writer, zid)?; if imsg::has_flag(header, flag::S) { - self.write(&mut *writer, x.resolution.as_u8())?; - self.write(&mut *writer, x.batch_size.to_le_bytes())?; + self.write(&mut *writer, resolution.as_u8())?; + self.write(&mut *writer, batch_size.to_le_bytes())?; } let zodec = Zenoh080Bounded::::new(); - zodec.write(&mut *writer, &x.cookie)?; + zodec.write(&mut *writer, cookie)?; // Extensions - if let Some(qos) = x.ext_qos.as_ref() { + if let Some(qos) = ext_qos.as_ref() { n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } - if let Some(shm) = x.ext_shm.as_ref() { + if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } - if let Some(auth) = x.ext_auth.as_ref() { + if let Some(auth) = ext_auth.as_ref() { n_exts -= 1; self.write(&mut *writer, (auth, n_exts != 0))?; } - if let Some(mlink) = x.ext_mlink.as_ref() { + if let Some(mlink) = ext_mlink.as_ref() { n_exts -= 1; self.write(&mut *writer, (mlink, n_exts != 0))?; } - if let Some(lowlatency) = x.ext_lowlatency.as_ref() { + if let Some(lowlatency) = ext_lowlatency.as_ref() { n_exts -= 1; self.write(&mut *writer, (lowlatency, n_exts != 0))?; } + if let Some(compression) = ext_compression.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (compression, n_exts != 0))?; + } Ok(()) } @@ -329,6 +375,7 @@ where let mut ext_auth = None; let mut ext_mlink = None; let mut ext_lowlatency = None; + let mut ext_compression = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -360,6 +407,11 @@ where ext_lowlatency = Some(q); has_ext = ext; } + ext::Compression::ID => { + let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?; + ext_compression = Some(q); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitAck", ext)?; } @@ -378,6 +430,7 @@ where ext_auth, ext_mlink, ext_lowlatency, + ext_compression, }) } } diff --git a/commons/zenoh-codec/src/transport/open.rs b/commons/zenoh-codec/src/transport/open.rs index bbcb43de98..17482b1610 100644 --- a/commons/zenoh-codec/src/transport/open.rs +++ b/commons/zenoh-codec/src/transport/open.rs @@ -35,16 +35,29 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &OpenSyn) -> Self::Output { + let OpenSyn { + initial_sn, + lease, + cookie, + ext_qos, + ext_shm, + ext_auth, + ext_mlink, + ext_lowlatency, + ext_compression, + } = x; + // Header let mut header = id::OPEN; - if x.lease.as_millis() % 1_000 == 0 { + if lease.as_millis() % 1_000 == 0 { header |= flag::T; } - let mut n_exts = (x.ext_qos.is_some() as u8) - + (x.ext_shm.is_some() as u8) - + (x.ext_auth.is_some() as u8) - + (x.ext_mlink.is_some() as u8) - + (x.ext_lowlatency.is_some() as u8); + let mut n_exts = (ext_qos.is_some() as u8) + + (ext_shm.is_some() as u8) + + (ext_auth.is_some() as u8) + + (ext_mlink.is_some() as u8) + + (ext_lowlatency.is_some() as u8) + + (ext_compression.is_some() as u8); if n_exts != 0 { header |= flag::Z; } @@ -52,34 +65,38 @@ where // Body if imsg::has_flag(header, flag::T) { - self.write(&mut *writer, x.lease.as_secs())?; + self.write(&mut *writer, lease.as_secs())?; } else { - self.write(&mut *writer, x.lease.as_millis() as u64)?; + self.write(&mut *writer, lease.as_millis() as u64)?; } - self.write(&mut *writer, x.initial_sn)?; - self.write(&mut *writer, &x.cookie)?; + self.write(&mut *writer, initial_sn)?; + self.write(&mut *writer, cookie)?; // Extensions - if let Some(qos) = x.ext_qos.as_ref() { + if let Some(qos) = ext_qos.as_ref() { n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } - if let Some(shm) = x.ext_shm.as_ref() { + if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } - if let Some(auth) = x.ext_auth.as_ref() { + if let Some(auth) = ext_auth.as_ref() { n_exts -= 1; self.write(&mut *writer, (auth, n_exts != 0))?; } - if let Some(mlink) = x.ext_mlink.as_ref() { + if let Some(mlink) = ext_mlink.as_ref() { n_exts -= 1; self.write(&mut *writer, (mlink, n_exts != 0))?; } - if let Some(lowlatency) = x.ext_lowlatency.as_ref() { + if let Some(lowlatency) = ext_lowlatency.as_ref() { n_exts -= 1; self.write(&mut *writer, (lowlatency, n_exts != 0))?; } + if let Some(compression) = ext_compression.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (compression, n_exts != 0))?; + } Ok(()) } @@ -125,6 +142,7 @@ where let mut ext_auth = None; let mut ext_mlink = None; let mut ext_lowlatency = None; + let mut ext_compression = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -156,6 +174,11 @@ where ext_lowlatency = Some(q); has_ext = ext; } + ext::Compression::ID => { + let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?; + ext_compression = Some(q); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "OpenSyn", ext)?; } @@ -171,6 +194,7 @@ where ext_auth, ext_mlink, ext_lowlatency, + ext_compression, }) } } @@ -183,18 +207,30 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &OpenAck) -> Self::Output { + let OpenAck { + initial_sn, + lease, + ext_qos, + ext_shm, + ext_auth, + ext_mlink, + ext_lowlatency, + ext_compression, + } = x; + // Header let mut header = id::OPEN; header |= flag::A; // Verify that the timeout is expressed in seconds, i.e. subsec part is 0. - if x.lease.subsec_nanos() == 0 { + if lease.subsec_nanos() == 0 { header |= flag::T; } - let mut n_exts = (x.ext_qos.is_some() as u8) - + (x.ext_shm.is_some() as u8) - + (x.ext_auth.is_some() as u8) - + (x.ext_mlink.is_some() as u8) - + (x.ext_lowlatency.is_some() as u8); + let mut n_exts = (ext_qos.is_some() as u8) + + (ext_shm.is_some() as u8) + + (ext_auth.is_some() as u8) + + (ext_mlink.is_some() as u8) + + (ext_lowlatency.is_some() as u8) + + (ext_compression.is_some() as u8); if n_exts != 0 { header |= flag::Z; } @@ -202,33 +238,37 @@ where // Body if imsg::has_flag(header, flag::T) { - self.write(&mut *writer, x.lease.as_secs())?; + self.write(&mut *writer, lease.as_secs())?; } else { - self.write(&mut *writer, x.lease.as_millis() as u64)?; + self.write(&mut *writer, lease.as_millis() as u64)?; } - self.write(&mut *writer, x.initial_sn)?; + self.write(&mut *writer, initial_sn)?; // Extensions - if let Some(qos) = x.ext_qos.as_ref() { + if let Some(qos) = ext_qos.as_ref() { n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } - if let Some(shm) = x.ext_shm.as_ref() { + if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } - if let Some(auth) = x.ext_auth.as_ref() { + if let Some(auth) = ext_auth.as_ref() { n_exts -= 1; self.write(&mut *writer, (auth, n_exts != 0))?; } - if let Some(mlink) = x.ext_mlink.as_ref() { + if let Some(mlink) = ext_mlink.as_ref() { n_exts -= 1; self.write(&mut *writer, (mlink, n_exts != 0))?; } - if let Some(lowlatency) = x.ext_lowlatency.as_ref() { + if let Some(lowlatency) = ext_lowlatency.as_ref() { n_exts -= 1; self.write(&mut *writer, (lowlatency, n_exts != 0))?; } + if let Some(compression) = ext_compression.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (compression, n_exts != 0))?; + } Ok(()) } @@ -273,6 +313,7 @@ where let mut ext_auth = None; let mut ext_mlink = None; let mut ext_lowlatency = None; + let mut ext_compression = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -304,6 +345,11 @@ where ext_lowlatency = Some(q); has_ext = ext; } + ext::Compression::ID => { + let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?; + ext_compression = Some(q); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "OpenAck", ext)?; } @@ -318,6 +364,7 @@ where ext_auth, ext_mlink, ext_lowlatency, + ext_compression, }) } } diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 5b4d3da835..16d876591f 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -126,6 +126,12 @@ impl Default for QoSConf { } } +impl Default for CompressionConf { + fn default() -> Self { + Self { enabled: true } + } +} + impl Default for LinkTxConf { #[allow(clippy::unnecessary_cast)] fn default() -> Self { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index b0857f2caf..aa5b6b6f70 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -235,6 +235,11 @@ validated_struct::validator! { /// If set to `false`, the QoS will be disabled. (default `true`). enabled: bool }, + pub compression: CompressionConf { + /// You must compile zenoh with "transport_compression" feature to be able to enable compression. + /// When enabled is true, batches will be sent compressed. (default `false`). + enabled: bool, + }, pub link: #[derive(Default)] TransportLinkConf { // An optional whitelist of protocols to be used for accepting and opening sessions. @@ -299,18 +304,6 @@ validated_struct::validator! { UnixPipeConf { file_access_mask: Option }, - pub compression: #[derive(Default)] - /// **Experimental** compression feature. - /// Will compress the batches hop to hop (as opposed to end to end). May cause errors when - /// the batches's complexity is too high, causing the resulting compression to be bigger in - /// size than the MTU. - /// You must use the features "transport_compression" and "unstable" to enable this. - CompressionConf { - /// When enabled is true, batches will be sent compressed. It does not affect the - /// reception, which always expects compressed batches when built with thes features - /// "transport_compression" and "unstable". - enabled: bool, - } }, pub shared_memory: SharedMemoryConf { diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index d553799fd1..0c60dd8a90 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -118,6 +118,7 @@ pub struct InitSyn { pub ext_auth: Option, pub ext_mlink: Option, pub ext_lowlatency: Option, + pub ext_compression: Option, } // Extensions @@ -146,6 +147,10 @@ pub mod ext { /// # LowLatency extension /// Used to negotiate the use of lowlatency transport pub type LowLatency = zextunit!(0x5, false); + + /// # Compression extension + /// Used to negotiate the use of compression on the link + pub type Compression = zextunit!(0x6, false); } impl InitSyn { @@ -166,6 +171,7 @@ impl InitSyn { let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); Self { version, @@ -178,6 +184,7 @@ impl InitSyn { ext_auth, ext_mlink, ext_lowlatency, + ext_compression, } } } @@ -195,6 +202,7 @@ pub struct InitAck { pub ext_auth: Option, pub ext_mlink: Option, pub ext_lowlatency: Option, + pub ext_compression: Option, } impl InitAck { @@ -220,6 +228,7 @@ impl InitAck { let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); Self { version, @@ -233,6 +242,7 @@ impl InitAck { ext_auth, ext_mlink, ext_lowlatency, + ext_compression, } } } diff --git a/commons/zenoh-protocol/src/transport/open.rs b/commons/zenoh-protocol/src/transport/open.rs index b7ec56da62..d793671b06 100644 --- a/commons/zenoh-protocol/src/transport/open.rs +++ b/commons/zenoh-protocol/src/transport/open.rs @@ -82,6 +82,7 @@ pub struct OpenSyn { pub ext_auth: Option, pub ext_mlink: Option, pub ext_lowlatency: Option, + pub ext_compression: Option, } // Extensions @@ -111,6 +112,10 @@ pub mod ext { /// # LowLatency extension /// Used to negotiate the use of lowlatency transport pub type LowLatency = zextunit!(0x5, false); + + /// # Compression extension + /// Used to negotiate the use of compression on the link + pub type Compression = zextunit!(0x6, false); } impl OpenSyn { @@ -137,6 +142,7 @@ impl OpenSyn { let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); Self { lease, @@ -147,6 +153,7 @@ impl OpenSyn { ext_auth, ext_mlink, ext_lowlatency, + ext_compression, } } } @@ -160,6 +167,7 @@ pub struct OpenAck { pub ext_auth: Option, pub ext_mlink: Option, pub ext_lowlatency: Option, + pub ext_compression: Option, } impl OpenAck { @@ -182,6 +190,7 @@ impl OpenAck { let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_mlink = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); Self { lease, @@ -191,6 +200,7 @@ impl OpenAck { ext_auth, ext_mlink, ext_lowlatency, + ext_compression, } } } diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index 51db4d671c..36e39eceed 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -24,10 +24,14 @@ categories = { workspace = true } description = "Internal crate for zenoh." # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +compression = [] + [dependencies] async-std = { workspace = true } async-trait = { workspace = true } flume = { workspace = true } +lz4_flex = { workspace = true } serde = { workspace = true, features = ["default"] } typenum = { workspace = true } zenoh-buffers = { workspace = true } diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 114990726a..e87d1b5c61 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -17,13 +17,10 @@ //! This crate is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -#![no_std] -extern crate alloc; - +pub(crate) mod batch; mod multicast; mod unicast; -use alloc::{borrow::ToOwned, boxed::Box, string::String}; use async_trait::async_trait; use core::{cmp::PartialEq, fmt, hash::Hash}; pub use multicast::*; diff --git a/io/zenoh-link-commons/src/multicast.rs b/io/zenoh-link-commons/src/multicast.rs index 65bc7195b6..904d53361d 100644 --- a/io/zenoh-link-commons/src/multicast.rs +++ b/io/zenoh-link-commons/src/multicast.rs @@ -11,13 +11,13 @@ // Contributors: // ZettaScale Zenoh Team, // -use alloc::{borrow::Cow, boxed::Box, sync::Arc, vec::Vec}; use async_trait::async_trait; use core::{ fmt, hash::{Hash, Hasher}, ops::Deref, }; +use std::{borrow::Cow, sync::Arc}; use zenoh_buffers::{reader::HasReader, writer::HasWriter}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_protocol::{ diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 7f3eb43518..774abd8fab 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -11,25 +11,23 @@ // Contributors: // ZettaScale Zenoh Team, // -use alloc::{boxed::Box, sync::Arc, vec::Vec}; use async_trait::async_trait; -use core::{ - convert::TryFrom, +use std::{ fmt, hash::{Hash, Hasher}, ops::Deref, + sync::Arc, }; -use zenoh_buffers::{ - reader::HasReader, - writer::{HasWriter, Writer}, -}; -use zenoh_codec::{RCodec, WCodec, Zenoh080}; +use zenoh_buffers::{reader::HasReader, ZSlice}; +use zenoh_codec::{RCodec, Zenoh080}; use zenoh_protocol::{ core::{EndPoint, Locator}, - transport::{BatchSize, TransportMessage}, + transport::TransportMessage, }; use zenoh_result::{zerror, ZResult}; +use crate::batch::{Encode, RBatch, WBatch}; + pub type LinkManagerUnicast = Arc; #[async_trait] pub trait LinkManagerUnicastTrait: Send + Sync { @@ -44,12 +42,6 @@ pub trait ConstructibleLinkManagerUnicast: Sized { fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult; } -#[derive(Clone, PartialEq, Eq)] -pub enum LinkUnicastDirection { - Inbound, - Outbound, -} - #[derive(Clone)] pub struct LinkUnicast(pub Arc); @@ -68,59 +60,35 @@ pub trait LinkUnicastTrait: Send + Sync { } impl LinkUnicast { - pub async fn send(&self, msg: &TransportMessage) -> ZResult { + pub async fn send_batch(&self, batch: WBatch) -> ZResult { const ERR: &str = "Write error on link: "; - - // Create the buffer for serializing the message - let mut buff = Vec::new(); - let mut writer = buff.writer(); - let codec = Zenoh080::new(); - - // Reserve 16 bits to write the length - if self.is_streamed() { - writer - .write_exact(BatchSize::MIN.to_le_bytes().as_slice()) - .map_err(|_| zerror!("{ERR}{self}"))?; - } - // Serialize the message - codec - .write(&mut writer, msg) - .map_err(|_| zerror!("{ERR}{self}"))?; - - // Write the length - if self.is_streamed() { - let num = BatchSize::MIN.to_le_bytes().len(); - let len = - BatchSize::try_from(writer.len() - num).map_err(|_| zerror!("{ERR}{self}"))?; - buff[..num].copy_from_slice(len.to_le_bytes().as_slice()); - } - + let buff = batch.finalize().map_err(|_| zerror!("{ERR}{self}"))?; // Send the message on the link self.0.write_all(buff.as_slice()).await?; - Ok(buff.len()) } + pub async fn send(&self, msg: &TransportMessage) -> ZResult { + const ERR: &str = "Write error on link: "; + // Create the batch for serializing the message + let mut batch = WBatch::new(self.get_mtu()).set_streamed(self.is_streamed()); + batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?; + self.send_batch(batch).await + } + + pub async fn recv_batch(&self, mut batch: RBatch) -> ZResult { + use crate::batch::ReadFrom; + const ERR: &str = "Read error from link: "; + batch.read_from(self).await?; + let zslice = batch.finalize().map_err(|_| zerror!("{ERR}{self}"))?; + Ok(zslice) + } + pub async fn recv(&self) -> ZResult { - // Read from the link - let buffer = if self.is_streamed() { - // Read and decode the message length - let mut length_bytes = BatchSize::MIN.to_le_bytes(); - self.read_exact(&mut length_bytes).await?; - let to_read = BatchSize::from_le_bytes(length_bytes) as usize; - // Read the message - let mut buffer = zenoh_buffers::vec::uninit(to_read); - self.read_exact(&mut buffer).await?; - buffer - } else { - // Read the message - let mut buffer = zenoh_buffers::vec::uninit(self.get_mtu() as usize); - let n = self.read(&mut buffer).await?; - buffer.truncate(n); - buffer - }; - - let mut reader = buffer.reader(); + let batch = RBatch::new(self.get_mtu()).set_streamed(self.is_streamed()); + let mut zslice = self.recv_batch(batch).await?; + + let mut reader = zslice.reader(); let codec = Zenoh080::new(); let msg: TransportMessage = codec diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 77f6b18db3..745e20d15e 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -41,7 +41,7 @@ transport_udp = ["zenoh-link/transport_udp"] transport_unixsock-stream = ["zenoh-link/transport_unixsock-stream"] transport_ws = ["zenoh-link/transport_ws"] transport_serial = ["zenoh-link/transport_serial"] -transport_compression = [] +transport_compression = ["zenoh-link-commons/compression"] transport_unixpipe = ["zenoh-link/transport_unixpipe"] stats = ["zenoh-protocol/stats"] test = [] @@ -68,6 +68,7 @@ zenoh-config = { workspace = true } zenoh-core = { workspace = true } zenoh-crypto = { workspace = true } zenoh-link = { workspace = true } +zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-shm = { workspace = true, optional = true } diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 6fc2051242..49edac38ec 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -11,33 +11,89 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::num::NonZeroUsize; +use core::future::Future; +use std::{ + num::{NonZeroU8, NonZeroUsize}, + process::Output, +}; use zenoh_buffers::{ - reader::{Reader, SiphonableReader}, + reader::{DidntRead, Reader, SiphonableReader}, writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer}, - BBuf, ZBufReader, + BBuf, ZBufReader, ZSlice, }; use zenoh_codec::{WCodec, Zenoh080}; +use zenoh_link::LinkUnicast; use zenoh_protocol::{ + common::imsg, core::Reliability, network::NetworkMessage, transport::{ fragment::FragmentHeader, frame::FrameHeader, BatchSize, TransportMessage, TransportSn, }, }; +use zenoh_result::ZResult; + +const LENGTH_BYTES: [u8; 2] = BatchSize::MIN.to_le_bytes(); +const HEADER_BYTES: [u8; 1] = u8::MIN.to_le_bytes(); + +mod header { + #[cfg(feature = "transport_compression")] + pub(super) const COMPRESSION: u8 = 1; +} + +// Split the inner buffer into (length, header, payload) inmutable slices +macro_rules! zsplit { + ($batch:expr) => {{ + let slice = $batch.buffer.as_slice(); + match ($batch.has_length(), $batch.has_header()) { + (false, false) => (&[], &[], slice), + (true, false) => { + let (length, payload) = slice.split_at(LENGTH_BYTES.len()); + (length, &[], payload) + } + (false, true) => { + let (header, payload) = slice.split_at(HEADER_BYTES.len()); + (&[], header, payload) + } + (true, true) => { + let (length, tmp) = slice.split_at(LENGTH_BYTES.len()); + let (header, payload) = tmp.split_at(HEADER_BYTES.len()); + (length, header, payload) + } + } + }}; +} -const LENGTH_BYTES: [u8; 2] = u16::MIN.to_be_bytes(); +// Split the inner buffer into (length, header, payload) mutable slices +macro_rules! zsplitmut { + ($batch:expr) => {{ + let (has_length, has_header) = ($batch.has_length(), $batch.has_header()); + let slice = $batch.buffer.as_mut_slice(); + match (has_length, has_header) { + (false, false) => (&mut [], &mut [], slice), + (true, false) => { + let (length, payload) = slice.split_at_mut(LENGTH_BYTES.len()); + (length, &mut [], payload) + } + (false, true) => { + let (header, payload) = slice.split_at_mut(HEADER_BYTES.len()); + (&mut [], header, payload) + } + (true, true) => { + let (length, tmp) = slice.split_at_mut(LENGTH_BYTES.len()); + let (header, payload) = tmp.split_at_mut(HEADER_BYTES.len()); + (length, header, payload) + } + } + }}; +} +// WRITE BATCH pub(crate) trait Encode { type Output; fn encode(self, message: Message) -> Self::Output; } -pub(crate) trait Decode { - type Error; - fn decode(self) -> Result; -} - #[derive(Clone, Copy, Debug)] #[repr(u8)] pub(crate) enum CurrentFrame { @@ -95,8 +151,10 @@ impl SerializationBatchStats { pub(crate) struct WBatch { // The buffer to perform the batching on buffer: BBuf, - // It is a streamed batch - is_streamed: bool, + // It contains 2 bytes indicating how many bytes are in the batch + has_length: bool, + // It contains 1 byte as additional header, e.g. to signal the batch is compressed + header: Option, // The current frame being serialized: BestEffort/Reliable current_frame: CurrentFrame, // The latest SN @@ -107,10 +165,13 @@ pub(crate) struct WBatch { } impl WBatch { - pub(crate) fn new(size: BatchSize, is_streamed: bool) -> Self { + pub(crate) fn new(size: BatchSize) -> Self { + let mut h = 0; + let mut batch = Self { buffer: BBuf::with_capacity(size as usize), - is_streamed, + has_length: false, + header: None, current_frame: CurrentFrame::None, latest_sn: LatestSn { reliable: None, @@ -126,6 +187,44 @@ impl WBatch { batch } + /// Verify that the [`SerializationBatch`][SerializationBatch] is for a compression-enabled link, + /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. + #[inline(always)] + pub(crate) fn set_streamed(mut self, v: bool) -> Self { + self.has_length = v; + self + } + + #[inline(always)] + pub(crate) const fn get_streamed(&self) -> bool { + self.has_length + } + + /// Verify that the [`SerializationBatch`][SerializationBatch] is for a compression-enabled link, + /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. + #[cfg(feature = "transport_compression")] + #[inline(always)] + pub(crate) fn set_compression(mut self, v: bool) -> Self { + if v { + self.header = match self.header.as_ref() { + Some(h) => NonZeroU8::new(h.get() | header::COMPRESSION), + None => NonZeroU8::new(header::COMPRESSION), + }; + } else { + self.header = self + .header + .and_then(|h| NonZeroU8::new(h.get() & !header::COMPRESSION)) + } + self + } + + #[cfg(feature = "transport_compression")] + #[inline(always)] + pub(crate) fn get_compression(&self) -> bool { + self.header + .is_some_and(|h| imsg::has_flag(h.get(), header::COMPRESSION)) + } + /// Verify that the [`SerializationBatch`][SerializationBatch] has no serialized bytes. #[inline(always)] pub(crate) fn is_empty(&self) -> bool { @@ -135,19 +234,25 @@ impl WBatch { /// Get the total number of bytes that have been serialized on the [`SerializationBatch`][SerializationBatch]. #[inline(always)] pub(crate) fn len(&self) -> BatchSize { - let len = self.buffer.len() as BatchSize; - if self.is_streamed() { - len - (LENGTH_BYTES.len() as BatchSize) - } else { - len + let mut len = self.buffer.len() as BatchSize; + if self.has_length() { + len -= LENGTH_BYTES.len() as BatchSize; } + len + } + + /// Verify that the [`SerializationBatch`][SerializationBatch] is for a stream-based protocol, i.e., the first + /// 2 bytes are reserved to encode the total amount of serialized bytes as 16-bits little endian. + #[inline(always)] + pub(crate) const fn has_length(&self) -> bool { + self.has_length } /// Verify that the [`SerializationBatch`][SerializationBatch] is for a stream-based protocol, i.e., the first /// 2 bytes are reserved to encode the total amount of serialized bytes as 16-bits little endian. #[inline(always)] - pub(crate) fn is_streamed(&self) -> bool { - self.is_streamed + pub(crate) const fn has_header(&self) -> bool { + self.header.is_some() } /// Clear the [`SerializationBatch`][SerializationBatch] memory buffer and related internal state. @@ -160,27 +265,71 @@ impl WBatch { { self.stats.clear(); } - if self.is_streamed() { + if self.has_length() { let mut writer = self.buffer.writer(); let _ = writer.write_exact(&LENGTH_BYTES[..]); } + if let Some(h) = self.header { + let mut writer = self.buffer.writer(); + let _ = writer.write_u8(h.get()); + } } /// In case the [`SerializationBatch`][SerializationBatch] is for a stream-based protocol, use the first 2 bytes /// to encode the total amount of serialized bytes as 16-bits little endian. - #[inline(always)] - pub(crate) fn write_len(&mut self) { - if self.is_streamed() { - let length = self.len(); - self.buffer.as_mut_slice()[..LENGTH_BYTES.len()].copy_from_slice(&length.to_le_bytes()); + pub(crate) fn finalize(mut self) -> ZResult { + if self.has_length() { + let (length, _h, _p) = self.split_mut(); + length.copy_from_slice(&self.len().to_le_bytes()); } + + if let Some(header) = self.header { + #[cfg(feature = "transport_compression")] + if self.get_compression() { + self.compress(); + } + } + + Ok(self.buffer) } /// Get a `&[u8]` to access the internal memory buffer, usually for transmitting it on the network. #[inline(always)] - pub(crate) fn as_bytes(&self) -> &[u8] { + pub(crate) fn as_slice(&self) -> &[u8] { self.buffer.as_slice() } + + // Split (length, header, payload) internal buffer slice + #[inline(always)] + fn split(&self) -> (&[u8], &[u8], &[u8]) { + zsplit!(self) + } + + // Split (length, header, payload) internal buffer slice + #[inline(always)] + fn split_mut(&mut self) -> (&mut [u8], &mut [u8], &mut [u8]) { + zsplitmut!(self) + } + + #[cfg(feature = "transport_compression")] + fn compress(&mut self) -> Result<(), DidntWrite> { + let (_length, _header, payload) = self.split(); + + let mut buffer = BBuf::with_capacity(self.buffer.capacity()); + let mut writer = buffer.writer(); + writer.with_slot(writer.remaining(), |b| { + lz4_flex::block::compress_into(payload, b).unwrap_or(0) + })?; + + if buffer.len() < self.buffer.len() { + self.buffer = buffer; + } else { + let (_length, header, _payload) = self.split_mut(); + header[0] &= !header::COMPRESSION; + } + + Ok(()) + } } impl Encode<&TransportMessage> for &mut WBatch { @@ -340,6 +489,74 @@ impl Encode<(&mut ZBufReader<'_>, FragmentHeader)> for &mut WBatch { } } +// // READ BATCH +// #[derive(Debug)] +// pub(crate) struct RBatch { +// // The buffer to perform deserializationn from +// buffer: Box<[u8]>, +// // It contains 2 bytes indicating how many bytes are in the batch +// has_length: bool, +// // It contains 1 byte as additional header, e.g. to signal the batch is compressed +// has_header: bool, +// } + +// impl RBatch { +// /// Verify that the [`SerializationBatch`][SerializationBatch] is for a stream-based protocol, i.e., the first +// /// 2 bytes are reserved to encode the total amount of serialized bytes as 16-bits little endian. +// #[inline(always)] +// pub(crate) const fn has_length(&self) -> bool { +// self.has_length +// } + +// /// Verify that the [`SerializationBatch`][SerializationBatch] is for a compression-enabled link, +// /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. + +// #[inline(always)] +// pub(crate) const fn has_header(&self) -> bool { +// self.has_header +// } + +// // Split (length, header, payload) internal buffer slice +// #[inline(always)] +// fn split(&self) -> (&[u8], &[u8], &[u8]) { +// zsplit!(self) +// } + +// // Split (length, header, payload) internal buffer slice +// #[inline(always)] +// fn split_mut(&mut self) -> (&mut [u8], &mut [u8], &mut [u8]) { +// zsplitmut!(self) +// } + +// pub(crate) async fn read_unicast(&mut self, link: &LinkUnicast) -> ZResult { +// let n = if self.has_length() { +// let mut length = [0_u8, 0_u8]; +// link.read_exact(&mut length).await?; +// let n = BatchSize::from_le_bytes(length) as usize; +// link.read_exact(&mut self.buffer[0..n]).await?; +// n +// } else { +// link.read(&mut self.buffer).await? +// }; + +// Ok(n) +// } + +// #[cfg(feature = "transport_compression")] +// pub(crate) fn uncompress_into(&mut self, batch: &mut WBatch) -> Result<(), DidntRead> { +// use zenoh_protocol::common::imsg; + +// self.clear(); +// let mut writer = self.buffer.writer(); +// // let (_length, header, payload) = self.split(); +// // if !header.is_empty() && imsg::has_flag(header[0], header::COMPRESSED) { +// // } else { +// // } + +// Ok(()) +// } +// } + #[cfg(test)] mod tests { use super::*; @@ -356,7 +573,7 @@ mod tests { #[test] fn serialization_batch() { - let mut batch = WBatch::new(u16::MAX, true); + let mut batch = WBatch::new(BatchSize::MAX); let tmsg: TransportMessage = KeepAlive.into(); let nmsg: NetworkMessage = Push { diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 47c5ef4a4d..29c616abe8 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -379,7 +379,7 @@ impl StageOutIn { #[inline] fn try_pull(&mut self) -> Pull { if let Some(mut batch) = self.s_out_r.pull() { - batch.write_len(); + batch.finalize(); self.backoff.stop(); return Pull::Some(batch); } @@ -398,7 +398,7 @@ impl StageOutIn { if let Ok(mut g) = self.current.try_lock() { // First try to pull from stage OUT if let Some(mut batch) = self.s_out_r.pull() { - batch.write_len(); + batch.finalize(); self.backoff.stop(); return Pull::Some(batch); } @@ -406,7 +406,7 @@ impl StageOutIn { // An incomplete (non-empty) batch is available in the state IN pipeline. match g.take() { Some(mut batch) => { - batch.write_len(); + batch.finalize(); self.backoff.stop(); return Pull::Some(batch); } @@ -421,7 +421,7 @@ impl StageOutIn { std::cmp::Ordering::Less => { // There should be a new batch in Stage OUT if let Some(mut batch) = self.s_out_r.pull() { - batch.write_len(); + batch.finalize(); self.backoff.stop(); return Pull::Some(batch); } @@ -470,7 +470,7 @@ impl StageOut { let mut batches = vec![]; // Empty the ring buffer while let Some(mut batch) = self.s_in.s_out_r.pull() { - batch.write_len(); + batch.finalize(); batches.push(batch); } // Take the current batch @@ -484,6 +484,8 @@ impl StageOut { #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct TransmissionPipelineConf { pub(crate) is_streamed: bool, + #[cfg(feature = "transport_compression")] + pub(crate) is_compression: bool, pub(crate) batch_size: BatchSize, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) backoff: Duration, @@ -493,6 +495,8 @@ impl Default for TransmissionPipelineConf { fn default() -> Self { Self { is_streamed: false, + #[cfg(feature = "transport_compression")] + is_compression: false, batch_size: BatchSize::MAX, queue_size: [1; Priority::NUM], backoff: Duration::from_micros(1), @@ -530,9 +534,12 @@ impl TransmissionPipeline { let (mut s_ref_w, s_ref_r) = RingBuffer::::init(); // Fill the refill ring buffer with batches for _ in 0..*num { - assert!(s_ref_w - .push(WBatch::new(config.batch_size, config.is_streamed)) - .is_none()); + let mut batch = WBatch::new(config.batch_size).set_streamed(config.is_streamed); + #[cfg(feature = "transport_compression")] + { + batch = batch.set_compression(config.is_compression); + } + assert!(s_ref_w.push(batch).is_none()); } // Create the channel for notifying that new batches are in the refill ring buffer // This is a SPSC channel @@ -730,6 +737,7 @@ mod tests { const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf { is_streamed: true, + is_compression: true, batch_size: BatchSize::MAX, queue_size: [1; Priority::NUM], backoff: Duration::from_micros(1), @@ -782,7 +790,7 @@ mod tests { batches += 1; bytes += batch.len() as usize; // Create a ZBuf for deserialization starting from the batch - let bytes = batch.as_bytes(); + let bytes = batch.as_slice(); // Deserialize the messages let mut reader = bytes.reader(); let codec = Zenoh080::new(); diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index b430e7efb1..0def1bbb47 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -106,6 +106,7 @@ impl TransportLinkMulticast { if self.handle_tx.is_none() { let tpc = TransmissionPipelineConf { is_streamed: false, + is_compression: false, batch_size: config.batch_size, queue_size: self.transport.manager.config.queue_size, backoff: self.transport.manager.config.queue_backoff, @@ -239,7 +240,7 @@ async fn tx_task( { Action::Pull((batch, priority)) => { // Send the buffer on the link - let bytes = batch.as_bytes(); + let bytes = batch.as_slice(); link.write_all(bytes).await?; // Keep track of next SNs if let Some(sn) = batch.latest_sn.reliable { @@ -298,7 +299,7 @@ async fn tx_task( // Drain the transmission pipeline and write remaining bytes on the wire let mut batches = pipeline.drain(); for (b, _) in batches.drain(..) { - link.write_all(b.as_bytes()) + link.write_all(b.as_slice()) .timeout(config.join_interval) .await .map_err(|_| { diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index c25c4bb873..11b86a9e25 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -14,9 +14,12 @@ #[cfg(feature = "shared-memory")] use crate::unicast::shared_memory_unicast::Challenge; use crate::{ - unicast::establishment::{ - close_link, compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, - Zenoh080Cookie, + unicast::{ + establishment::{ + close_link, compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, + Zenoh080Cookie, + }, + TransportLinkUnicastConfig, TransportLinkUnicastDirection, }, TransportConfigUnicast, TransportManager, }; @@ -28,7 +31,7 @@ use zenoh_buffers::{reader::HasReader, writer::HasWriter, ZSlice}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_core::{zasynclock, zcondfeat, zerror}; use zenoh_crypto::{BlockCipher, PseudoRng}; -use zenoh_link::{LinkUnicast, LinkUnicastDirection}; +use zenoh_link::LinkUnicast; use zenoh_protocol::{ core::{Field, Resolution, WhatAmI, ZenohId}, transport::{ @@ -41,21 +44,27 @@ use zenoh_result::ZResult; pub(super) type AcceptError = (zenoh_result::Error, Option); -struct StateZenoh { +struct StateTransport { batch_size: BatchSize, resolution: Resolution, -} - -struct State { - zenoh: StateZenoh, ext_qos: ext::qos::StateAccept, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateAccept, #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateAccept, + ext_lowlatency: ext::lowlatency::StateAccept, +} + +struct StateLink { #[cfg(feature = "transport_auth")] ext_auth: ext::auth::StateAccept, - ext_lowlatency: ext::lowlatency::StateAccept, + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::StateAccept, +} + +struct State { + transport: StateTransport, + link: StateLink, } // InitSyn @@ -117,6 +126,8 @@ struct AcceptLink<'a> { #[cfg(feature = "transport_auth")] ext_auth: ext::auth::AuthFsm<'a>, ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::CompressionFsm<'a>, } #[async_trait] @@ -160,38 +171,32 @@ impl<'a> AcceptFsm for AcceptLink<'a> { } // Compute the minimum SN resolution - state.zenoh.resolution = { + state.transport.resolution = { let mut res = Resolution::default(); // Frame SN let i_fsn_res = init_syn.resolution.get(Field::FrameSN); - let m_fsn_res = state.zenoh.resolution.get(Field::FrameSN); + let m_fsn_res = state.transport.resolution.get(Field::FrameSN); res.set(Field::FrameSN, i_fsn_res.min(m_fsn_res)); // Request ID let i_rid_res = init_syn.resolution.get(Field::RequestID); - let m_rid_res = state.zenoh.resolution.get(Field::RequestID); + let m_rid_res = state.transport.resolution.get(Field::RequestID); res.set(Field::RequestID, i_rid_res.min(m_rid_res)); res }; // Compute the minimum batch size - state.zenoh.batch_size = state - .zenoh + state.transport.batch_size = state + .transport .batch_size .min(init_syn.batch_size) .min(batch_size::UNICAST); // Extension QoS self.ext_qos - .recv_init_syn((&mut state.ext_qos, init_syn.ext_qos)) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - - // Extension LowLatency - self.ext_lowlatency - .recv_init_syn((&mut state.ext_lowlatency, init_syn.ext_lowlatency)) + .recv_init_syn((&mut state.transport.ext_qos, init_syn.ext_qos)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -206,14 +211,27 @@ impl<'a> AcceptFsm for AcceptLink<'a> { // Extension Auth #[cfg(feature = "transport_auth")] self.ext_auth - .recv_init_syn((&mut state.ext_auth, init_syn.ext_auth)) + .recv_init_syn((&mut state.link.ext_auth, init_syn.ext_auth)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Extension MultiLink #[cfg(feature = "transport_multilink")] self.ext_mlink - .recv_init_syn((&mut state.ext_mlink, init_syn.ext_mlink)) + .recv_init_syn((&mut state.transport.ext_mlink, init_syn.ext_mlink)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension LowLatency + self.ext_lowlatency + .recv_init_syn((&mut state.transport.ext_lowlatency, init_syn.ext_lowlatency)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension Compression + #[cfg(feature = "transport_compression")] + self.ext_compression + .recv_init_syn((&mut state.link.ext_compression, init_syn.ext_compression)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -238,14 +256,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { // Extension QoS let ext_qos = self .ext_qos - .send_init_ack(&state.ext_qos) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - - // Extension LowLatency - let ext_lowlatency = self - .ext_lowlatency - .send_init_ack(&state.ext_lowlatency) + .send_init_ack(&state.transport.ext_qos) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -253,7 +264,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { let ext_shm = zcondfeat!( "shared-memory", self.ext_shm - .send_init_ack((&mut state.ext_shm, input.ext_shm)) + .send_init_ack((&mut state.transport.ext_shm, input.ext_shm)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -263,7 +274,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { let ext_auth = zcondfeat!( "transport_auth", self.ext_auth - .send_init_ack(&state.ext_auth) + .send_init_ack(&state.link.ext_auth) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -273,7 +284,24 @@ impl<'a> AcceptFsm for AcceptLink<'a> { let ext_mlink = zcondfeat!( "transport_multilink", self.ext_mlink - .send_init_ack(&state.ext_mlink) + .send_init_ack(&state.transport.ext_mlink) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?, + None + ); + + // Extension LowLatency + let ext_lowlatency = self + .ext_lowlatency + .send_init_ack(&state.transport.ext_lowlatency) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension MultiLink + let ext_compression = zcondfeat!( + "transport_compression", + self.ext_compression + .send_init_ack(&state.link.ext_compression) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -284,17 +312,19 @@ impl<'a> AcceptFsm for AcceptLink<'a> { let cookie = Cookie { zid: input.other_zid, whatami: input.other_whatami, - resolution: state.zenoh.resolution, - batch_size: state.zenoh.batch_size, + resolution: state.transport.resolution, + batch_size: state.transport.batch_size, nonce: cookie_nonce, - ext_qos: state.ext_qos, + ext_qos: state.transport.ext_qos, #[cfg(feature = "transport_multilink")] - ext_mlink: state.ext_mlink, + ext_mlink: state.transport.ext_mlink, #[cfg(feature = "shared-memory")] - ext_shm: state.ext_shm, + ext_shm: state.transport.ext_shm, #[cfg(feature = "transport_auth")] - ext_auth: state.ext_auth, - ext_lowlatency: state.ext_lowlatency, + ext_auth: state.link.ext_auth, + ext_lowlatency: state.transport.ext_lowlatency, + #[cfg(feature = "transport_compression")] + ext_compression: state.link.ext_compression, }; let mut encrypted = vec![]; @@ -317,14 +347,15 @@ impl<'a> AcceptFsm for AcceptLink<'a> { version: input.mine_version, whatami: input.mine_whatami, zid: input.mine_zid, - resolution: state.zenoh.resolution, - batch_size: state.zenoh.batch_size, + resolution: state.transport.resolution, + batch_size: state.transport.batch_size, cookie, ext_qos, ext_shm, ext_auth, ext_mlink, ext_lowlatency, + ext_compression, } .into(); @@ -400,29 +431,27 @@ impl<'a> AcceptFsm for AcceptLink<'a> { // Rebuild the state from the cookie let mut state = State { - zenoh: StateZenoh { + transport: StateTransport { batch_size: cookie.batch_size, resolution: cookie.resolution, + ext_qos: cookie.ext_qos, + #[cfg(feature = "transport_multilink")] + ext_mlink: cookie.ext_mlink, + #[cfg(feature = "shared-memory")] + ext_shm: cookie.ext_shm, + ext_lowlatency: cookie.ext_lowlatency, + }, + link: StateLink { + #[cfg(feature = "transport_auth")] + ext_auth: cookie.ext_auth, + #[cfg(feature = "transport_compression")] + ext_compression: cookie.ext_compression, }, - ext_qos: cookie.ext_qos, - #[cfg(feature = "transport_multilink")] - ext_mlink: cookie.ext_mlink, - #[cfg(feature = "shared-memory")] - ext_shm: cookie.ext_shm, - #[cfg(feature = "transport_auth")] - ext_auth: cookie.ext_auth, - ext_lowlatency: cookie.ext_lowlatency, }; // Extension QoS self.ext_qos - .recv_open_syn((&mut state.ext_qos, open_syn.ext_qos)) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - - // Extension LowLatency - self.ext_lowlatency - .recv_open_syn((&mut state.ext_lowlatency, open_syn.ext_lowlatency)) + .recv_open_syn((&mut state.transport.ext_qos, open_syn.ext_qos)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -436,14 +465,27 @@ impl<'a> AcceptFsm for AcceptLink<'a> { // Extension Auth #[cfg(feature = "transport_auth")] self.ext_auth - .recv_open_syn((&mut state.ext_auth, open_syn.ext_auth)) + .recv_open_syn((&mut state.link.ext_auth, open_syn.ext_auth)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Extension MultiLink #[cfg(feature = "transport_multilink")] self.ext_mlink - .recv_open_syn((&mut state.ext_mlink, open_syn.ext_mlink)) + .recv_open_syn((&mut state.transport.ext_mlink, open_syn.ext_mlink)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension LowLatency + self.ext_lowlatency + .recv_open_syn((&mut state.transport.ext_lowlatency, open_syn.ext_lowlatency)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension Compression + #[cfg(feature = "transport_compression")] + self.ext_compression + .recv_open_syn((&mut state.link.ext_compression, open_syn.ext_compression)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -467,14 +509,14 @@ impl<'a> AcceptFsm for AcceptLink<'a> { // Extension QoS let ext_qos = self .ext_qos - .send_open_ack(&state.ext_qos) + .send_open_ack(&state.transport.ext_qos) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Extension LowLatency let ext_lowlatency = self .ext_lowlatency - .send_open_ack(&state.ext_lowlatency) + .send_open_ack(&state.transport.ext_lowlatency) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -492,7 +534,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { let ext_auth = zcondfeat!( "transport_auth", self.ext_auth - .send_open_ack(&state.ext_auth) + .send_open_ack(&state.link.ext_auth) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -502,14 +544,25 @@ impl<'a> AcceptFsm for AcceptLink<'a> { let ext_mlink = zcondfeat!( "transport_multilink", self.ext_mlink - .send_open_ack(&state.ext_mlink) + .send_open_ack(&state.transport.ext_mlink) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?, + None + ); + + // Extension Compression + let ext_compression = zcondfeat!( + "transport_compression", + self.ext_compression + .send_open_ack(&state.link.ext_compression) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None ); // Build OpenAck message - let mine_initial_sn = compute_sn(input.mine_zid, input.other_zid, state.zenoh.resolution); + let mine_initial_sn = + compute_sn(input.mine_zid, input.other_zid, state.transport.resolution); let open_ack = OpenAck { lease: input.mine_lease, initial_sn: mine_initial_sn, @@ -518,6 +571,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { ext_auth, ext_mlink, ext_lowlatency, + ext_compression, }; // Do not send the OpenAck right now since we might still incur in MAX_LINKS error @@ -540,6 +594,8 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) #[cfg(feature = "transport_auth")] ext_auth: manager.state.unicast.authenticator.fsm(&manager.prng), ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::CompressionFsm::new(), }; // Init handshake @@ -558,26 +614,34 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) let iack_out = { let mut state = State { - zenoh: StateZenoh { + transport: StateTransport { batch_size: manager.config.batch_size, resolution: manager.config.resolution, + ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos), + #[cfg(feature = "transport_multilink")] + ext_mlink: manager + .state + .unicast + .multilink + .accept(manager.config.unicast.max_links > 1), + #[cfg(feature = "shared-memory")] + ext_shm: ext::shm::StateAccept::new(manager.config.unicast.is_shm), + ext_lowlatency: ext::lowlatency::StateAccept::new( + manager.config.unicast.is_lowlatency, + ), + }, + link: StateLink { + #[cfg(feature = "transport_auth")] + ext_auth: manager + .state + .unicast + .authenticator + .accept(&mut *zasynclock!(manager.prng)), + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::StateAccept::new( + manager.config.unicast.is_compression, + ), }, - ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos), - ext_lowlatency: ext::lowlatency::StateAccept::new(manager.config.unicast.is_lowlatency), - #[cfg(feature = "transport_multilink")] - ext_mlink: manager - .state - .unicast - .multilink - .accept(manager.config.unicast.max_links > 1), - #[cfg(feature = "shared-memory")] - ext_shm: ext::shm::StateAccept::new(manager.config.unicast.is_shm), - #[cfg(feature = "transport_auth")] - ext_auth: manager - .state - .unicast - .authenticator - .accept(&mut *zasynclock!(manager.prng)), }; // Let's scope the Init phase in such a way memory is freed by Rust @@ -618,19 +682,24 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) let config = TransportConfigUnicast { zid: osyn_out.other_zid, whatami: osyn_out.other_whatami, - sn_resolution: state.zenoh.resolution.get(Field::FrameSN), + sn_resolution: state.transport.resolution.get(Field::FrameSN), tx_initial_sn: oack_out.open_ack.initial_sn, - is_qos: state.ext_qos.is_qos(), + is_qos: state.transport.ext_qos.is_qos(), #[cfg(feature = "transport_multilink")] - multilink: state.ext_mlink.multilink(), + multilink: state.transport.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] is_shm: state.ext_shm.is_shm(), - is_lowlatency: state.ext_lowlatency.is_lowlatency(), + is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), }; + let link_config = TransportLinkUnicastConfig { + direction: TransportLinkUnicastDirection::Inbound, + #[cfg(feature = "transport_compression")] + is_compression: state.link.ext_compression.is_compression(), + }; let transport = step!( manager - .init_transport_unicast(config, link.clone(), LinkUnicastDirection::Inbound) + .init_transport_unicast(config, link.clone(), link_config) .await ); @@ -651,7 +720,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) let input = InputFinalize { transport: transport.clone(), other_lease: osyn_out.other_lease, - agreed_batch_size: state.zenoh.batch_size, + agreed_batch_size: state.transport.batch_size, }; step!(finalize_transport(link, manager, input) .await diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 0c6b5519e8..e9916be7e6 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -38,6 +38,8 @@ pub(crate) struct Cookie { #[cfg(feature = "transport_auth")] pub(crate) ext_auth: ext::auth::StateAccept, pub(crate) ext_lowlatency: ext::lowlatency::StateAccept, + #[cfg(feature = "transport_compression")] + pub(crate) ext_compression: ext::compression::StateAccept, } impl WCodec<&Cookie, &mut W> for Zenoh080 @@ -62,6 +64,8 @@ where #[cfg(feature = "transport_auth")] self.write(&mut *writer, &x.ext_auth)?; self.write(&mut *writer, &x.ext_lowlatency)?; + #[cfg(feature = "transport_compression")] + self.write(&mut *writer, &x.ext_compression)?; Ok(()) } @@ -90,6 +94,8 @@ where #[cfg(feature = "transport_auth")] let ext_auth: ext::auth::StateAccept = self.read(&mut *reader)?; let ext_lowlatency: ext::lowlatency::StateAccept = self.read(&mut *reader)?; + #[cfg(feature = "transport_compression")] + let ext_compression: ext::compression::StateAccept = self.read(&mut *reader)?; let cookie = Cookie { zid, @@ -105,6 +111,8 @@ where #[cfg(feature = "transport_auth")] ext_auth, ext_lowlatency, + #[cfg(feature = "transport_compression")] + ext_compression, }; Ok(cookie) @@ -174,6 +182,8 @@ impl Cookie { #[cfg(feature = "transport_auth")] ext_auth: ext::auth::StateAccept::rand(), ext_lowlatency: ext::lowlatency::StateAccept::rand(), + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::StateAccept::rand(), } } } diff --git a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs index 956a8c5112..f4aafa832c 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs @@ -13,6 +13,8 @@ // #[cfg(feature = "transport_auth")] pub mod auth; +#[cfg(feature = "transport_compression")] +pub(crate) mod compression; pub(crate) mod lowlatency; #[cfg(feature = "transport_multilink")] pub(crate) mod multilink; diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 19f94cf26e..a87ae69c6b 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -14,8 +14,9 @@ #[cfg(feature = "shared-memory")] use crate::unicast::shared_memory_unicast::Challenge; use crate::{ - unicast::establishment::{ - close_link, compute_sn, ext, finalize_transport, InputFinalize, OpenFsm, + unicast::{ + establishment::{close_link, compute_sn, ext, finalize_transport, InputFinalize, OpenFsm}, + TransportLinkUnicastConfig, TransportLinkUnicastDirection, }, TransportConfigUnicast, TransportManager, TransportUnicast, }; @@ -25,7 +26,7 @@ use zenoh_buffers::ZSlice; #[cfg(feature = "transport_auth")] use zenoh_core::zasynclock; use zenoh_core::{zcondfeat, zerror}; -use zenoh_link::{LinkUnicast, LinkUnicastDirection}; +use zenoh_link::LinkUnicast; use zenoh_protocol::{ core::{Field, Resolution, WhatAmI, ZenohId}, transport::{ @@ -37,21 +38,27 @@ use zenoh_result::ZResult; type OpenError = (zenoh_result::Error, Option); -struct StateZenoh { +struct StateTransport { batch_size: BatchSize, resolution: Resolution, -} - -struct State { - zenoh: StateZenoh, ext_qos: ext::qos::StateOpen, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateOpen, #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateOpen, + ext_lowlatency: ext::lowlatency::StateOpen, +} + +struct StateLink { #[cfg(feature = "transport_auth")] ext_auth: ext::auth::StateOpen, - ext_lowlatency: ext::lowlatency::StateOpen, + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::StateOpen, +} + +struct State { + transport: StateTransport, + link: StateLink, } // InitSyn @@ -101,6 +108,8 @@ struct OpenLink<'a> { #[cfg(feature = "transport_auth")] ext_auth: ext::auth::AuthFsm<'a>, ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::CompressionFsm<'a>, } #[async_trait] @@ -118,14 +127,7 @@ impl<'a> OpenFsm for OpenLink<'a> { // Extension QoS let ext_qos = self .ext_qos - .send_init_syn(&state.ext_qos) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - - // Extension LowLatency - let ext_lowlatency = self - .ext_lowlatency - .send_init_syn(&state.ext_lowlatency) + .send_init_syn(&state.transport.ext_qos) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -133,7 +135,7 @@ impl<'a> OpenFsm for OpenLink<'a> { let ext_shm = zcondfeat!( "shared-memory", self.ext_shm - .send_init_syn(&state.ext_shm) + .send_init_syn(&state.transport.ext_shm) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -143,7 +145,7 @@ impl<'a> OpenFsm for OpenLink<'a> { let ext_auth = zcondfeat!( "transport_auth", self.ext_auth - .send_init_syn(&state.ext_auth) + .send_init_syn(&state.link.ext_auth) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -153,7 +155,24 @@ impl<'a> OpenFsm for OpenLink<'a> { let ext_mlink = zcondfeat!( "transport_multilink", self.ext_mlink - .send_init_syn(&state.ext_mlink) + .send_init_syn(&state.transport.ext_mlink) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?, + None + ); + + // Extension LowLatency + let ext_lowlatency = self + .ext_lowlatency + .send_init_syn(&state.transport.ext_lowlatency) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension Compression + let ext_compression = zcondfeat!( + "transport_compression", + self.ext_compression + .send_init_syn(&state.link.ext_compression) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -163,13 +182,14 @@ impl<'a> OpenFsm for OpenLink<'a> { version: input.mine_version, whatami: input.mine_whatami, zid: input.mine_zid, - batch_size: state.zenoh.batch_size, - resolution: state.zenoh.resolution, + batch_size: state.transport.batch_size, + resolution: state.transport.resolution, ext_qos, ext_shm, ext_auth, ext_mlink, ext_lowlatency, + ext_compression, } .into(); @@ -220,12 +240,12 @@ impl<'a> OpenFsm for OpenLink<'a> { }; // Compute the minimum SN resolution - state.zenoh.resolution = { + state.transport.resolution = { let mut res = Resolution::default(); // Frame SN let i_fsn_res = init_ack.resolution.get(Field::FrameSN); - let m_fsn_res = state.zenoh.resolution.get(Field::FrameSN); + let m_fsn_res = state.transport.resolution.get(Field::FrameSN); if i_fsn_res > m_fsn_res { let e = zerror!( @@ -241,7 +261,7 @@ impl<'a> OpenFsm for OpenLink<'a> { // Request ID let i_rid_res = init_ack.resolution.get(Field::RequestID); - let m_rid_res = state.zenoh.resolution.get(Field::RequestID); + let m_rid_res = state.transport.resolution.get(Field::RequestID); if i_rid_res > m_rid_res { let e = zerror!( @@ -259,17 +279,11 @@ impl<'a> OpenFsm for OpenLink<'a> { }; // Compute the minimum batch size - state.zenoh.batch_size = state.zenoh.batch_size.min(init_ack.batch_size); + state.transport.batch_size = state.transport.batch_size.min(init_ack.batch_size); // Extension QoS self.ext_qos - .recv_init_ack((&mut state.ext_qos, init_ack.ext_qos)) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - - // Extension LowLatency - self.ext_lowlatency - .recv_init_ack((&mut state.ext_lowlatency, init_ack.ext_lowlatency)) + .recv_init_ack((&mut state.transport.ext_qos, init_ack.ext_qos)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -284,14 +298,27 @@ impl<'a> OpenFsm for OpenLink<'a> { // Extension Auth #[cfg(feature = "transport_auth")] self.ext_auth - .recv_init_ack((&mut state.ext_auth, init_ack.ext_auth)) + .recv_init_ack((&mut state.link.ext_auth, init_ack.ext_auth)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Extension MultiLink #[cfg(feature = "transport_multilink")] self.ext_mlink - .recv_init_ack((&mut state.ext_mlink, init_ack.ext_mlink)) + .recv_init_ack((&mut state.transport.ext_mlink, init_ack.ext_mlink)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension LowLatency + self.ext_lowlatency + .recv_init_ack((&mut state.transport.ext_lowlatency, init_ack.ext_lowlatency)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension Compression + #[cfg(feature = "transport_compression")] + self.ext_compression + .recv_init_ack((&mut state.link.ext_compression, init_ack.ext_compression)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -316,14 +343,7 @@ impl<'a> OpenFsm for OpenLink<'a> { // Extension QoS let ext_qos = self .ext_qos - .send_open_syn(&state.ext_qos) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - - // Extension LowLatency - let ext_lowlatency = self - .ext_lowlatency - .send_open_syn(&state.ext_lowlatency) + .send_open_syn(&state.transport.ext_qos) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -331,7 +351,7 @@ impl<'a> OpenFsm for OpenLink<'a> { let ext_shm = zcondfeat!( "shared-memory", self.ext_shm - .send_open_syn((&state.ext_shm, input.ext_shm)) + .send_open_syn((&state.transport.ext_shm, input.ext_shm)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -341,7 +361,7 @@ impl<'a> OpenFsm for OpenLink<'a> { let ext_auth = zcondfeat!( "transport_auth", self.ext_auth - .send_open_syn(&state.ext_auth) + .send_open_syn(&state.link.ext_auth) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None @@ -351,14 +371,32 @@ impl<'a> OpenFsm for OpenLink<'a> { let ext_mlink = zcondfeat!( "transport_multilink", self.ext_mlink - .send_open_syn(&state.ext_mlink) + .send_open_syn(&state.transport.ext_mlink) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?, + None + ); + + // Extension LowLatency + let ext_lowlatency = self + .ext_lowlatency + .send_open_syn(&state.transport.ext_lowlatency) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension Compression + let ext_compression = zcondfeat!( + "transport_compression", + self.ext_compression + .send_open_syn(&state.link.ext_compression) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?, None ); // Build and send an OpenSyn message - let mine_initial_sn = compute_sn(input.mine_zid, input.other_zid, state.zenoh.resolution); + let mine_initial_sn = + compute_sn(input.mine_zid, input.other_zid, state.transport.resolution); let message: TransportMessage = OpenSyn { lease: input.mine_lease, initial_sn: mine_initial_sn, @@ -368,6 +406,7 @@ impl<'a> OpenFsm for OpenLink<'a> { ext_auth, ext_mlink, ext_lowlatency, + ext_compression, } .into(); @@ -420,34 +459,41 @@ impl<'a> OpenFsm for OpenLink<'a> { // Extension QoS self.ext_qos - .recv_open_ack((&mut state.ext_qos, open_ack.ext_qos)) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - - // Extension LowLatency - self.ext_lowlatency - .recv_open_ack((&mut state.ext_lowlatency, open_ack.ext_lowlatency)) + .recv_open_ack((&mut state.transport.ext_qos, open_ack.ext_qos)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Extension Shm #[cfg(feature = "shared-memory")] self.ext_shm - .recv_open_ack((&mut state.ext_shm, open_ack.ext_shm)) + .recv_open_ack((&mut state.transport.ext_shm, open_ack.ext_shm)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Extension Auth #[cfg(feature = "transport_auth")] self.ext_auth - .recv_open_ack((&mut state.ext_auth, open_ack.ext_auth)) + .recv_open_ack((&mut state.link.ext_auth, open_ack.ext_auth)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Extension MultiLink #[cfg(feature = "transport_multilink")] self.ext_mlink - .recv_open_ack((&mut state.ext_mlink, open_ack.ext_mlink)) + .recv_open_ack((&mut state.transport.ext_mlink, open_ack.ext_mlink)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension LowLatency + self.ext_lowlatency + .recv_open_ack((&mut state.transport.ext_lowlatency, open_ack.ext_lowlatency)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + + // Extension Compression + #[cfg(feature = "transport_compression")] + self.ext_compression + .recv_open_ack((&mut state.link.ext_compression, open_ack.ext_compression)) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -473,29 +519,38 @@ pub(crate) async fn open_link( #[cfg(feature = "transport_auth")] ext_auth: manager.state.unicast.authenticator.fsm(&manager.prng), ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::CompressionFsm::new(), }; let mut state = State { - zenoh: StateZenoh { + transport: StateTransport { batch_size: manager.config.batch_size.min(batch_size::UNICAST), resolution: manager.config.resolution, + ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), + #[cfg(feature = "transport_multilink")] + ext_mlink: manager + .state + .unicast + .multilink + .open(manager.config.unicast.max_links > 1), + #[cfg(feature = "shared-memory")] + ext_shm: ext::shm::StateOpen::new(manager.config.unicast.is_shm), + + ext_lowlatency: ext::lowlatency::StateOpen::new(manager.config.unicast.is_lowlatency), + }, + link: StateLink { + #[cfg(feature = "transport_auth")] + ext_auth: manager + .state + .unicast + .authenticator + .open(&mut *zasynclock!(manager.prng)), + #[cfg(feature = "transport_compression")] + ext_compression: ext::compression::StateOpen::new( + manager.config.unicast.is_compression, + ), }, - ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), - #[cfg(feature = "transport_multilink")] - ext_mlink: manager - .state - .unicast - .multilink - .open(manager.config.unicast.max_links > 1), - #[cfg(feature = "shared-memory")] - ext_shm: ext::shm::StateOpen::new(manager.config.unicast.is_shm), - #[cfg(feature = "transport_auth")] - ext_auth: manager - .state - .unicast - .authenticator - .open(&mut *zasynclock!(manager.prng)), - ext_lowlatency: ext::lowlatency::StateOpen::new(manager.config.unicast.is_lowlatency), }; // Init handshake @@ -537,19 +592,23 @@ pub(crate) async fn open_link( let config = TransportConfigUnicast { zid: iack_out.other_zid, whatami: iack_out.other_whatami, - sn_resolution: state.zenoh.resolution.get(Field::FrameSN), + sn_resolution: state.transport.resolution.get(Field::FrameSN), tx_initial_sn: osyn_out.mine_initial_sn, - is_qos: state.ext_qos.is_qos(), + is_qos: state.transport.ext_qos.is_qos(), #[cfg(feature = "transport_multilink")] - multilink: state.ext_mlink.multilink(), + multilink: state.transport.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] is_shm: state.ext_shm.is_shm(), - is_lowlatency: state.ext_lowlatency.is_lowlatency(), + is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), + }; + let link_config = TransportLinkUnicastConfig { + direction: TransportLinkUnicastDirection::Outbound, + #[cfg(feature = "transport_compression")] + is_compression: state.link.ext_compression.is_compression(), }; - let transport = step!( manager - .init_transport_unicast(config, link.clone(), LinkUnicastDirection::Outbound) + .init_transport_unicast(config, link.clone(), link_config) .await ); @@ -563,7 +622,7 @@ pub(crate) async fn open_link( let output = InputFinalize { transport, other_lease: oack_out.other_lease, - agreed_batch_size: state.zenoh.batch_size, + agreed_batch_size: state.transport.batch_size, }; let transport = output.transport.clone(); let res = finalize_transport(link, manager, output).await; diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index ea97aa143b..e20b42e899 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -17,6 +17,7 @@ use super::link::send_with_link; use crate::stats::TransportStats; use crate::transport_unicast_inner::TransportUnicastTrait; use crate::TransportConfigUnicast; +use crate::TransportLinkUnicastConfig; use crate::TransportManager; use crate::{TransportExecutor, TransportPeerEventHandler}; use async_executor::Task; @@ -34,7 +35,7 @@ use zenoh_core::{zasynclock, zasyncread, zread, zwrite}; use zenoh_link::unixpipe::UNIXPIPE_LOCATOR_PREFIX; #[cfg(feature = "transport_unixpipe")] use zenoh_link::Link; -use zenoh_link::{LinkUnicast, LinkUnicastDirection}; +use zenoh_link::LinkUnicast; use zenoh_protocol::core::{WhatAmI, ZenohId}; use zenoh_protocol::network::NetworkMessage; use zenoh_protocol::transport::TransportBodyLowLatency; @@ -223,7 +224,11 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { /*************************************/ /* LINK */ /*************************************/ - async fn add_link(&self, link: LinkUnicast, _direction: LinkUnicastDirection) -> ZResult<()> { + async fn add_link( + &self, + link: LinkUnicast, + _config: TransportLinkUnicastConfig, + ) -> ZResult<()> { log::trace!("Adding link: {}", link); #[cfg(not(feature = "transport_unixpipe"))] diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index 384b992401..0bd2b9f64a 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -20,7 +20,7 @@ use crate::unicast::establishment::ext::multilink::MultiLink; use crate::{ lowlatency::transport::TransportUnicastLowlatency, transport_unicast_inner::TransportUnicastTrait, - unicast::{TransportConfigUnicast, TransportUnicast}, + unicast::{TransportConfigUnicast, TransportLinkUnicastConfig, TransportUnicast}, universal::transport::TransportUnicastUniversal, TransportManager, }; @@ -28,7 +28,7 @@ use async_std::{prelude::FutureExt, sync::Mutex, task}; use std::{collections::HashMap, sync::Arc, time::Duration}; #[cfg(feature = "shared-memory")] use zenoh_config::SharedMemoryConf; -use zenoh_config::{Config, LinkTxConf, QoSConf, TransportUnicastConf}; +use zenoh_config::{CompressionConf, Config, LinkTxConf, QoSConf, TransportUnicastConf}; use zenoh_core::{zasynclock, zcondfeat}; use zenoh_crypto::PseudoRng; use zenoh_link::*; @@ -53,8 +53,8 @@ pub struct TransportManagerConfigUnicast { pub max_links: usize, #[cfg(feature = "shared-memory")] pub is_shm: bool, - #[cfg(all(feature = "unstable", feature = "transport_compression"))] - pub is_compressed: bool, + #[cfg(feature = "transport_compression")] + pub is_compression: bool, } pub struct TransportManagerStateUnicast { @@ -99,6 +99,8 @@ pub struct TransportManagerBuilderUnicast { #[cfg(feature = "transport_auth")] pub(super) authenticator: Auth, pub(super) is_lowlatency: bool, + #[cfg(feature = "transport_compression")] + pub(super) is_compress: bool, } impl TransportManagerBuilderUnicast { @@ -155,9 +157,9 @@ impl TransportManagerBuilderUnicast { self } - #[cfg(all(feature = "unstable", feature = "transport_compression"))] - pub fn compression(mut self, is_compressed: bool) -> Self { - self.is_compressed = is_compressed; + #[cfg(feature = "transport_compression")] + pub fn compression(mut self, is_compress: bool) -> Self { + self.is_compress = is_compress; self } @@ -212,6 +214,8 @@ impl TransportManagerBuilderUnicast { #[cfg(all(feature = "unstable", feature = "transport_compression"))] is_compressed: self.is_compressed, is_lowlatency: self.is_lowlatency, + #[cfg(feature = "transport_compression")] + is_compression: self.is_compress, }; let state = TransportManagerStateUnicast { @@ -239,6 +243,8 @@ impl Default for TransportManagerBuilderUnicast { let qos = QoSConf::default(); #[cfg(feature = "shared-memory")] let shm = SharedMemoryConf::default(); + #[cfg(feature = "transport_compression")] + let compression = CompressionConf::default(); Self { lease: Duration::from_millis(*link_tx.lease()), @@ -254,6 +260,8 @@ impl Default for TransportManagerBuilderUnicast { #[cfg(feature = "transport_auth")] authenticator: Auth::default(), is_lowlatency: *transport.lowlatency(), + #[cfg(feature = "transport_compression")] + is_compress: *compression.enabled(), } } } @@ -399,7 +407,7 @@ impl TransportManager { &self, config: TransportConfigUnicast, link: LinkUnicast, - direction: LinkUnicastDirection, + link_config: TransportLinkUnicastConfig, ) -> Result)> { let mut guard = zasynclock!(self.state.unicast.transports); @@ -422,7 +430,7 @@ impl TransportManager { // Add the link to the transport transport - .add_link(link, direction) + .add_link(link, link_config) .await .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; @@ -458,7 +466,7 @@ impl TransportManager { .map_err(|e| (e, Some(close::reason::INVALID))) .map(|v| Arc::new(v) as Arc)?; // Add the link to the transport - t.add_link(link, direction) + t.add_link(link, link_config) .await .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; t diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index d2a14a0276..d96afeca76 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -40,6 +40,24 @@ use zenoh_protocol::{ }; use zenoh_result::{zerror, ZResult}; +/*************************************/ +/* TRANSPORT UNICAST LINK */ +/*************************************/ +#[derive(Clone, Copy, PartialEq, Eq)] +pub(crate) enum TransportLinkUnicastDirection { + Inbound, + Outbound, +} + +#[derive(Clone, Copy)] +pub(crate) struct TransportLinkUnicastConfig { + // Inbound / outbound + pub(super) direction: TransportLinkUnicastDirection, + // Compression is active on the link + #[cfg(feature = "transport_compression")] + is_compression: bool, +} + /*************************************/ /* TRANSPORT UNICAST */ /*************************************/ diff --git a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs index acb6503c30..bdefe69e89 100644 --- a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs +++ b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs @@ -12,11 +12,14 @@ // ZettaScale Zenoh Team, // -use std::{fmt::DebugStruct, sync::Arc, time::Duration}; - +use crate::{ + TransportConfigUnicast, TransportExecutor, TransportLinkUnicastConfig, + TransportPeerEventHandler, +}; use async_std::sync::MutexGuard as AsyncMutexGuard; use async_trait::async_trait; -use zenoh_link::{LinkUnicast, LinkUnicastDirection}; +use std::{fmt::DebugStruct, sync::Arc, time::Duration}; +use zenoh_link::LinkUnicast; use zenoh_protocol::{ core::{WhatAmI, ZenohId}, network::NetworkMessage, @@ -24,8 +27,6 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; -use crate::{TransportConfigUnicast, TransportExecutor, TransportPeerEventHandler}; - /*************************************/ /* UNICAST TRANSPORT TRAIT */ /*************************************/ @@ -50,7 +51,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { /*************************************/ /* LINK */ /*************************************/ - async fn add_link(&self, link: LinkUnicast, direction: LinkUnicastDirection) -> ZResult<()>; + async fn add_link(&self, link: LinkUnicast, config: TransportLinkUnicastConfig) -> ZResult<()>; /*************************************/ /* TX */ diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 1128e8c2f9..e7150cff4b 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -12,14 +12,19 @@ // ZettaScale Zenoh Team, // use super::transport::TransportUnicastUniversal; -use crate::common::pipeline::{ - TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, - TransmissionPipelineProducer, -}; -use crate::common::priority::TransportPriorityTx; #[cfg(feature = "stats")] use crate::common::stats::TransportStats; -use crate::TransportExecutor; +use crate::{ + common::{ + pipeline::{ + TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, + TransmissionPipelineProducer, + }, + priority::TransportPriorityTx, + }, + unicast::TransportLinkUnicastConfig, + TransportExecutor, +}; use async_std::prelude::FutureExt; use async_std::task; use async_std::task::JoinHandle; @@ -29,7 +34,7 @@ use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; use zenoh_buffers::ZSlice; -use zenoh_link::{LinkUnicast, LinkUnicastDirection}; +use zenoh_link::LinkUnicast; use zenoh_protocol::transport::{BatchSize, KeepAlive, TransportMessage}; use zenoh_result::{bail, zerror, ZResult}; use zenoh_sync::{RecyclingObjectPool, Signal}; @@ -57,12 +62,12 @@ const MAX_BATCH_SIZE: usize = u16::MAX as usize; #[derive(Clone)] pub(super) struct TransportLinkUnicast { - // Inbound / outbound - pub(super) direction: LinkUnicastDirection, // The underlying link pub(super) link: LinkUnicast, // The transmission pipeline pub(super) pipeline: Option, + // The config + pub(super) config: TransportLinkUnicastConfig, // The transport this link is associated to transport: TransportUnicastUniversal, // The signals to stop TX/RX tasks @@ -75,13 +80,13 @@ impl TransportLinkUnicast { pub(super) fn new( transport: TransportUnicastUniversal, link: LinkUnicast, - direction: LinkUnicastDirection, + config: TransportLinkUnicastConfig, ) -> TransportLinkUnicast { TransportLinkUnicast { - direction, - transport, link, pipeline: None, + config, + transport, handle_tx: None, signal_rx: Signal::new(), handle_rx: None, @@ -94,12 +99,14 @@ impl TransportLinkUnicast { &mut self, executor: &TransportExecutor, keep_alive: Duration, - batch_size: u16, + batch_size: BatchSize, priority_tx: &[TransportPriorityTx], ) { if self.handle_tx.is_none() { let config = TransmissionPipelineConf { is_streamed: self.link.is_streamed(), + #[cfg(feature = "transport_compression")] + is_compression: self.config.is_compression, batch_size: batch_size.min(self.link.get_mtu()), queue_size: self.transport.manager.config.queue_size, backoff: self.transport.manager.config.queue_backoff, @@ -218,7 +225,7 @@ async fn tx_task( Some((batch, priority)) => { // Send the buffer on the link #[allow(unused_mut)] - let mut bytes = batch.as_bytes(); + let mut bytes = batch.as_slice(); #[cfg(all(feature = "unstable", feature = "transport_compression"))] { @@ -261,7 +268,7 @@ async fn tx_task( // Drain the transmission pipeline and write remaining bytes on the wire let mut batches = pipeline.drain(); for (b, _) in batches.drain(..) { - link.write_all(b.as_bytes()) + link.write_all(b.as_slice()) .timeout(keep_alive) .await .map_err(|_| zerror!("{}: flush failed after {} ms", link, keep_alive.as_millis()))??; diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 5c17b36827..8f8fc3f84b 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -11,21 +11,26 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::common::priority::{TransportPriorityRx, TransportPriorityTx}; #[cfg(feature = "stats")] use crate::stats::TransportStats; -use crate::transport_unicast_inner::TransportUnicastTrait; -use crate::unicast::universal::link::TransportLinkUnicast; -use crate::TransportConfigUnicast; -use crate::{TransportExecutor, TransportManager, TransportPeerEventHandler}; +use crate::{ + common::priority::{TransportPriorityRx, TransportPriorityTx}, + transport_unicast_inner::TransportUnicastTrait, + unicast::{ + universal::link::TransportLinkUnicast, TransportLinkUnicastConfig, + TransportLinkUnicastDirection, + }, + TransportConfigUnicast, TransportExecutor, TransportManager, TransportPeerEventHandler, +}; use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; use async_trait::async_trait; use std::fmt::DebugStruct; use std::sync::{Arc, RwLock}; use std::time::Duration; use zenoh_core::{zasynclock, zcondfeat, zread, zwrite}; -use zenoh_link::{Link, LinkUnicast, LinkUnicastDirection}; +use zenoh_link::{Link, LinkUnicast}; use zenoh_protocol::network::NetworkMessage; +use zenoh_protocol::transport::BatchSize; use zenoh_protocol::{ core::{Priority, WhatAmI, ZenohId}, transport::{Close, PrioritySn, TransportMessage, TransportSn}, @@ -246,13 +251,16 @@ impl TransportUnicastTrait for TransportUnicastUniversal { /*************************************/ /* LINK */ /*************************************/ - async fn add_link(&self, link: LinkUnicast, direction: LinkUnicastDirection) -> ZResult<()> { + async fn add_link(&self, link: LinkUnicast, config: TransportLinkUnicastConfig) -> ZResult<()> { // Add the link to the channel let mut guard = zwrite!(self.links); // Check if we can add more inbound links - if let LinkUnicastDirection::Inbound = direction { - let count = guard.iter().filter(|l| l.direction == direction).count(); + if let TransportLinkUnicastDirection::Inbound = config.direction { + let count = guard + .iter() + .filter(|l| l.config.direction == config.direction) + .count(); let limit = zcondfeat!( "transport_multilink", @@ -276,7 +284,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } // Create a channel link from a link - let link = TransportLinkUnicast::new(self.clone(), link, direction); + let link = TransportLinkUnicast::new(self.clone(), link, config); let mut links = Vec::with_capacity(guard.len() + 1); links.extend_from_slice(&guard); @@ -422,7 +430,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { link: &LinkUnicast, executor: &TransportExecutor, keep_alive: Duration, - batch_size: u16, + batch_size: BatchSize, ) -> ZResult<()> { let mut guard = zwrite!(self.links); match zlinkgetmut!(guard, link) { diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 1fa34eab32..457cebf971 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -39,6 +39,7 @@ shared-memory = [ ] stats = ["zenoh-transport/stats", "zenoh-protocol/stats"] transport_multilink = ["zenoh-transport/transport_multilink"] +transport_compression = ["zenoh-transport/transport_compression"] transport_quic = ["zenoh-transport/transport_quic"] transport_serial = ["zenoh-transport/transport_serial"] transport_unixpipe = ["zenoh-transport/transport_unixpipe"] @@ -52,6 +53,7 @@ default = [ "auth_pubkey", "auth_usrpwd", "transport_multilink", + "transport_compression", "transport_quic", "transport_tcp", "transport_tls",