diff --git a/Cargo.lock b/Cargo.lock index a18f116f79..da2c495514 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5000,7 +5000,6 @@ dependencies = [ "zenoh-core", "zenoh-crypto", "zenoh-link", - "zenoh-link-commons", "zenoh-protocol", "zenoh-result", "zenoh-shm", diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index e87d1b5c61..114990726a 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -17,10 +17,13 @@ //! This crate is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -pub(crate) mod batch; +#![no_std] +extern crate alloc; + mod multicast; mod unicast; +use alloc::{borrow::ToOwned, boxed::Box, string::String}; use async_trait::async_trait; use core::{cmp::PartialEq, fmt, hash::Hash}; pub use multicast::*; diff --git a/io/zenoh-link-commons/src/multicast.rs b/io/zenoh-link-commons/src/multicast.rs index 904d53361d..65bc7195b6 100644 --- a/io/zenoh-link-commons/src/multicast.rs +++ b/io/zenoh-link-commons/src/multicast.rs @@ -11,13 +11,13 @@ // Contributors: // ZettaScale Zenoh Team, // +use alloc::{borrow::Cow, boxed::Box, sync::Arc, vec::Vec}; use async_trait::async_trait; use core::{ fmt, hash::{Hash, Hasher}, ops::Deref, }; -use std::{borrow::Cow, sync::Arc}; use zenoh_buffers::{reader::HasReader, writer::HasWriter}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_protocol::{ diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 774abd8fab..d44686ff50 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -11,22 +11,15 @@ // Contributors: // ZettaScale Zenoh Team, // +use alloc::{boxed::Box, sync::Arc, vec::Vec}; use async_trait::async_trait; -use std::{ +use core::{ fmt, hash::{Hash, Hasher}, ops::Deref, - sync::Arc, }; -use zenoh_buffers::{reader::HasReader, ZSlice}; -use zenoh_codec::{RCodec, Zenoh080}; -use zenoh_protocol::{ - core::{EndPoint, Locator}, - transport::TransportMessage, -}; -use zenoh_result::{zerror, ZResult}; - -use crate::batch::{Encode, RBatch, WBatch}; +use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_result::ZResult; pub type LinkManagerUnicast = Arc; #[async_trait] @@ -59,46 +52,6 @@ pub trait LinkUnicastTrait: Send + Sync { async fn close(&self) -> ZResult<()>; } -impl LinkUnicast { - pub async fn send_batch(&self, batch: WBatch) -> ZResult { - const ERR: &str = "Write error on link: "; - let buff = batch.finalize().map_err(|_| zerror!("{ERR}{self}"))?; - // Send the message on the link - self.0.write_all(buff.as_slice()).await?; - Ok(buff.len()) - } - - pub async fn send(&self, msg: &TransportMessage) -> ZResult { - const ERR: &str = "Write error on link: "; - // Create the batch for serializing the message - let mut batch = WBatch::new(self.get_mtu()).set_streamed(self.is_streamed()); - batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?; - self.send_batch(batch).await - } - - pub async fn recv_batch(&self, mut batch: RBatch) -> ZResult { - use crate::batch::ReadFrom; - const ERR: &str = "Read error from link: "; - batch.read_from(self).await?; - let zslice = batch.finalize().map_err(|_| zerror!("{ERR}{self}"))?; - Ok(zslice) - } - - pub async fn recv(&self) -> ZResult { - let batch = RBatch::new(self.get_mtu()).set_streamed(self.is_streamed()); - let mut zslice = self.recv_batch(batch).await?; - - let mut reader = zslice.reader(); - let codec = Zenoh080::new(); - - let msg: TransportMessage = codec - .read(&mut reader) - .map_err(|_| zerror!("Read error on link: {}", self))?; - - Ok(msg) - } -} - impl Deref for LinkUnicast { type Target = Arc; diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 745e20d15e..77f6b18db3 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -41,7 +41,7 @@ transport_udp = ["zenoh-link/transport_udp"] transport_unixsock-stream = ["zenoh-link/transport_unixsock-stream"] transport_ws = ["zenoh-link/transport_ws"] transport_serial = ["zenoh-link/transport_serial"] -transport_compression = ["zenoh-link-commons/compression"] +transport_compression = [] transport_unixpipe = ["zenoh-link/transport_unixpipe"] stats = ["zenoh-protocol/stats"] test = [] @@ -68,7 +68,6 @@ zenoh-config = { workspace = true } zenoh-core = { workspace = true } zenoh-crypto = { workspace = true } zenoh-link = { workspace = true } -zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-shm = { workspace = true, optional = true } diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 49edac38ec..332ea5822e 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -11,21 +11,21 @@ // Contributors: // ZettaScale Zenoh Team, // -use core::future::Future; use std::{ num::{NonZeroU8, NonZeroUsize}, - process::Output, + sync::Arc, }; use zenoh_buffers::{ reader::{DidntRead, Reader, SiphonableReader}, writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer}, - BBuf, ZBufReader, ZSlice, + BBuf, ZBufReader, ZSlice, ZSliceBuffer, }; use zenoh_codec::{WCodec, Zenoh080}; -use zenoh_link::LinkUnicast; +use zenoh_core::zcondfeat; +use zenoh_link::{LinkMulticast, LinkUnicast}; use zenoh_protocol::{ common::imsg, - core::Reliability, + core::{Locator, Reliability}, network::NetworkMessage, transport::{ fragment::FragmentHeader, frame::FrameHeader, BatchSize, TransportMessage, TransportSn, @@ -43,20 +43,19 @@ mod header { // Split the inner buffer into (length, header, payload) inmutable slices macro_rules! zsplit { - ($batch:expr) => {{ - let slice = $batch.buffer.as_slice(); - match ($batch.has_length(), $batch.has_header()) { - (false, false) => (&[], &[], slice), + ($slice:expr, $has_length:expr, $has_header:expr) => {{ + match ($has_length, $has_header) { + (false, false) => (&[], &[], $slice), (true, false) => { - let (length, payload) = slice.split_at(LENGTH_BYTES.len()); + let (length, payload) = $slice.split_at(LENGTH_BYTES.len()); (length, &[], payload) } (false, true) => { - let (header, payload) = slice.split_at(HEADER_BYTES.len()); + let (header, payload) = $slice.split_at(HEADER_BYTES.len()); (&[], header, payload) } (true, true) => { - let (length, tmp) = slice.split_at(LENGTH_BYTES.len()); + let (length, tmp) = $slice.split_at(LENGTH_BYTES.len()); let (header, payload) = tmp.split_at(HEADER_BYTES.len()); (length, header, payload) } @@ -66,21 +65,19 @@ macro_rules! zsplit { // Split the inner buffer into (length, header, payload) mutable slices macro_rules! zsplitmut { - ($batch:expr) => {{ - let (has_length, has_header) = ($batch.has_length(), $batch.has_header()); - let slice = $batch.buffer.as_mut_slice(); - match (has_length, has_header) { - (false, false) => (&mut [], &mut [], slice), + ($slice:expr, $has_length:expr, $has_header:expr) => {{ + match ($has_length, $has_header) { + (false, false) => (&mut [], &mut [], $slice), (true, false) => { - let (length, payload) = slice.split_at_mut(LENGTH_BYTES.len()); + let (length, payload) = $slice.split_at_mut(LENGTH_BYTES.len()); (length, &mut [], payload) } (false, true) => { - let (header, payload) = slice.split_at_mut(HEADER_BYTES.len()); + let (header, payload) = $slice.split_at_mut(HEADER_BYTES.len()); (&mut [], header, payload) } (true, true) => { - let (length, tmp) = slice.split_at_mut(LENGTH_BYTES.len()); + let (length, tmp) = $slice.split_at_mut(LENGTH_BYTES.len()); let (header, payload) = tmp.split_at_mut(HEADER_BYTES.len()); (length, header, payload) } @@ -89,23 +86,23 @@ macro_rules! zsplitmut { } // WRITE BATCH -pub(crate) trait Encode { +pub trait Encode { type Output; fn encode(self, message: Message) -> Self::Output; } #[derive(Clone, Copy, Debug)] #[repr(u8)] -pub(crate) enum CurrentFrame { +pub enum CurrentFrame { Reliable, BestEffort, None, } #[derive(Clone, Copy, Debug)] -pub(crate) struct LatestSn { - pub(crate) reliable: Option, - pub(crate) best_effort: Option, +pub struct LatestSn { + pub reliable: Option, + pub best_effort: Option, } impl LatestSn { @@ -117,12 +114,12 @@ impl LatestSn { #[cfg(feature = "stats")] #[derive(Clone, Copy, Debug, Default)] -pub(crate) struct SerializationBatchStats { - pub(crate) t_msgs: usize, +pub struct WBatchStats { + pub t_msgs: usize, } #[cfg(feature = "stats")] -impl SerializationBatchStats { +impl WBatchStats { fn clear(&mut self) { self.t_msgs = 0; } @@ -136,19 +133,19 @@ impl SerializationBatchStats { /// [`TransportMessage`][TransportMessage] are always serialized on the batch as they are, while /// [`ZenohMessage`][ZenohMessage] are always serializaed on the batch as part of a [`TransportMessage`] /// [TransportMessage] Frame. Reliable and Best Effort Frames can be interleaved on the same -/// [`SerializationBatch`][SerializationBatch] as long as they fit in the remaining buffer capacity. +/// [`WBatch`][WBatch] as long as they fit in the remaining buffer capacity. /// -/// In the serialized form, the [`SerializationBatch`][SerializationBatch] always contains one or more +/// In the serialized form, the [`WBatch`][WBatch] always contains one or more /// [`TransportMessage`][TransportMessage]. In the particular case of [`TransportMessage`][TransportMessage] Frame, /// its payload is either (i) one or more complete [`ZenohMessage`][ZenohMessage] or (ii) a fragment of a /// a [`ZenohMessage`][ZenohMessage]. /// -/// As an example, the content of the [`SerializationBatch`][SerializationBatch] in memory could be: +/// As an example, the content of the [`WBatch`][WBatch] in memory could be: /// /// | Keep Alive | Frame Reliable | Frame Best Effort | /// -#[derive(Debug)] -pub(crate) struct WBatch { +#[derive(Clone, Debug)] +pub struct WBatch { // The buffer to perform the batching on buffer: BBuf, // It contains 2 bytes indicating how many bytes are in the batch @@ -158,27 +155,35 @@ pub(crate) struct WBatch { // The current frame being serialized: BestEffort/Reliable current_frame: CurrentFrame, // The latest SN - pub(crate) latest_sn: LatestSn, + pub latest_sn: LatestSn, // Statistics related to this batch #[cfg(feature = "stats")] - pub(crate) stats: SerializationBatchStats, + pub stats: WBatchStats, } impl WBatch { - pub(crate) fn new(size: BatchSize) -> Self { + pub fn new( + size: BatchSize, + is_streamed: bool, + #[cfg(feature = "transport_compression")] is_compression: bool, + ) -> Self { let mut h = 0; + #[cfg(feature = "transport_compression")] + if is_compression { + h |= header::COMPRESSION; + } let mut batch = Self { buffer: BBuf::with_capacity(size as usize), - has_length: false, - header: None, + has_length: is_streamed, + header: NonZeroU8::new(h), current_frame: CurrentFrame::None, latest_sn: LatestSn { reliable: None, best_effort: None, }, #[cfg(feature = "stats")] - stats: SerializationBatchStats::default(), + stats: WBatchStats::default(), }; // Bring the batch in a clear state @@ -187,53 +192,15 @@ impl WBatch { batch } - /// Verify that the [`SerializationBatch`][SerializationBatch] is for a compression-enabled link, - /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. - #[inline(always)] - pub(crate) fn set_streamed(mut self, v: bool) -> Self { - self.has_length = v; - self - } - + /// Verify that the [`WBatch`][WBatch] has no serialized bytes. #[inline(always)] - pub(crate) const fn get_streamed(&self) -> bool { - self.has_length - } - - /// Verify that the [`SerializationBatch`][SerializationBatch] is for a compression-enabled link, - /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. - #[cfg(feature = "transport_compression")] - #[inline(always)] - pub(crate) fn set_compression(mut self, v: bool) -> Self { - if v { - self.header = match self.header.as_ref() { - Some(h) => NonZeroU8::new(h.get() | header::COMPRESSION), - None => NonZeroU8::new(header::COMPRESSION), - }; - } else { - self.header = self - .header - .and_then(|h| NonZeroU8::new(h.get() & !header::COMPRESSION)) - } - self - } - - #[cfg(feature = "transport_compression")] - #[inline(always)] - pub(crate) fn get_compression(&self) -> bool { - self.header - .is_some_and(|h| imsg::has_flag(h.get(), header::COMPRESSION)) - } - - /// Verify that the [`SerializationBatch`][SerializationBatch] has no serialized bytes. - #[inline(always)] - pub(crate) fn is_empty(&self) -> bool { + pub fn is_empty(&self) -> bool { self.len() == 0 } - /// Get the total number of bytes that have been serialized on the [`SerializationBatch`][SerializationBatch]. + /// Get the total number of bytes that have been serialized on the [`WBatch`][WBatch]. #[inline(always)] - pub(crate) fn len(&self) -> BatchSize { + pub fn len(&self) -> BatchSize { let mut len = self.buffer.len() as BatchSize; if self.has_length() { len -= LENGTH_BYTES.len() as BatchSize; @@ -241,23 +208,31 @@ impl WBatch { len } - /// Verify that the [`SerializationBatch`][SerializationBatch] is for a stream-based protocol, i.e., the first + /// Verify that the [`WBatch`][WBatch] is for a stream-based protocol, i.e., the first + /// 2 bytes are reserved to encode the total amount of serialized bytes as 16-bits little endian. + #[inline(always)] + pub fn is_compression(&self) -> bool { + self.header + .is_some_and(|h| imsg::has_flag(h.get(), header::COMPRESSION)) + } + + /// Verify that the [`WBatch`][WBatch] is for a stream-based protocol, i.e., the first /// 2 bytes are reserved to encode the total amount of serialized bytes as 16-bits little endian. #[inline(always)] - pub(crate) const fn has_length(&self) -> bool { + pub const fn has_length(&self) -> bool { self.has_length } - /// Verify that the [`SerializationBatch`][SerializationBatch] is for a stream-based protocol, i.e., the first + /// Verify that the [`WBatch`][WBatch] is for a stream-based protocol, i.e., the first /// 2 bytes are reserved to encode the total amount of serialized bytes as 16-bits little endian. #[inline(always)] - pub(crate) const fn has_header(&self) -> bool { + pub const fn has_header(&self) -> bool { self.header.is_some() } - /// Clear the [`SerializationBatch`][SerializationBatch] memory buffer and related internal state. + /// Clear the [`WBatch`][WBatch] memory buffer and related internal state. #[inline(always)] - pub(crate) fn clear(&mut self) { + pub fn clear(&mut self) { self.buffer.clear(); self.current_frame = CurrentFrame::None; self.latest_sn.clear(); @@ -267,7 +242,7 @@ impl WBatch { } if self.has_length() { let mut writer = self.buffer.writer(); - let _ = writer.write_exact(&LENGTH_BYTES[..]); + let _ = writer.write_exact(&LENGTH_BYTES); } if let Some(h) = self.header { let mut writer = self.buffer.writer(); @@ -275,57 +250,74 @@ impl WBatch { } } - /// In case the [`SerializationBatch`][SerializationBatch] is for a stream-based protocol, use the first 2 bytes + /// In case the [`WBatch`][WBatch] is for a stream-based protocol, use the first 2 bytes /// to encode the total amount of serialized bytes as 16-bits little endian. - pub(crate) fn finalize(mut self) -> ZResult { + pub fn finalize(&mut self) -> Result<(), DidntWrite> { if self.has_length() { - let (length, _h, _p) = self.split_mut(); - length.copy_from_slice(&self.len().to_le_bytes()); + let len = self.len(); + let (l, _h, _p) = self.split_mut(); + l.copy_from_slice(&len.to_le_bytes()); } - if let Some(header) = self.header { - #[cfg(feature = "transport_compression")] - if self.get_compression() { - self.compress(); - } + #[cfg(feature = "transport_compression")] + if self.is_compression() { + self.compress()?; } - Ok(self.buffer) + Ok(()) } /// Get a `&[u8]` to access the internal memory buffer, usually for transmitting it on the network. #[inline(always)] - pub(crate) fn as_slice(&self) -> &[u8] { + pub fn as_slice(&self) -> &[u8] { self.buffer.as_slice() } // Split (length, header, payload) internal buffer slice #[inline(always)] fn split(&self) -> (&[u8], &[u8], &[u8]) { - zsplit!(self) + zsplit!(self.buffer.as_slice(), self.has_length(), self.has_header()) } // Split (length, header, payload) internal buffer slice #[inline(always)] fn split_mut(&mut self) -> (&mut [u8], &mut [u8], &mut [u8]) { - zsplitmut!(self) + zsplitmut!( + self.buffer.as_mut_slice(), + self.has_length(), + self.has_header() + ) } #[cfg(feature = "transport_compression")] fn compress(&mut self) -> Result<(), DidntWrite> { let (_length, _header, payload) = self.split(); + // Create a new empty buffer let mut buffer = BBuf::with_capacity(self.buffer.capacity()); + + // Write the initial bytes for the batch let mut writer = buffer.writer(); + if self.has_length() { + let _ = writer.write_exact(&LENGTH_BYTES); + } + if let Some(h) = self.header { + let _ = writer.write_u8(h.get()); + } + + // Compress the actual content writer.with_slot(writer.remaining(), |b| { lz4_flex::block::compress_into(payload, b).unwrap_or(0) })?; + // Verify wether the resulting compressed data is smaller than the initial input if buffer.len() < self.buffer.len() { + // Replace the buffer in this batch self.buffer = buffer; } else { - let (_length, header, _payload) = self.split_mut(); - header[0] &= !header::COMPRESSION; + // Keep the original uncompressed buffer and unset the compression flag from the header + let (_l, h, _p) = self.split_mut(); + h[0] &= !header::COMPRESSION; } Ok(()) @@ -335,7 +327,7 @@ impl WBatch { impl Encode<&TransportMessage> for &mut WBatch { type Output = Result<(), DidntWrite>; - /// Try to serialize a [`TransportMessage`][TransportMessage] on the [`SerializationBatch`][SerializationBatch]. + /// Try to serialize a [`TransportMessage`][TransportMessage] on the [`WBatch`][WBatch]. /// /// # Arguments /// * `message` - The [`TransportMessage`][TransportMessage] to serialize. @@ -363,7 +355,7 @@ impl Encode<&TransportMessage> for &mut WBatch { } #[repr(u8)] -pub(crate) enum WError { +pub enum WError { NewFrame, DidntWrite, } @@ -371,7 +363,7 @@ pub(crate) enum WError { impl Encode<&NetworkMessage> for &mut WBatch { type Output = Result<(), WError>; - /// Try to serialize a [`NetworkMessage`][NetworkMessage] on the [`SerializationBatch`][SerializationBatch]. + /// Try to serialize a [`NetworkMessage`][NetworkMessage] on the [`WBatch`][WBatch]. /// /// # Arguments /// * `message` - The [`NetworkMessage`][NetworkMessage] to serialize. @@ -402,7 +394,7 @@ impl Encode<&NetworkMessage> for &mut WBatch { impl Encode<(&NetworkMessage, FrameHeader)> for &mut WBatch { type Output = Result<(), DidntWrite>; - /// Try to serialize a [`NetworkMessage`][NetworkMessage] on the [`SerializationBatch`][SerializationBatch]. + /// Try to serialize a [`NetworkMessage`][NetworkMessage] on the [`WBatch`][WBatch]. /// /// # Arguments /// * `message` - The [`NetworkMessage`][NetworkMessage] to serialize. @@ -445,7 +437,7 @@ impl Encode<(&NetworkMessage, FrameHeader)> for &mut WBatch { impl Encode<(&mut ZBufReader<'_>, FragmentHeader)> for &mut WBatch { type Output = Result; - /// Try to serialize a [`ZenohMessage`][ZenohMessage] on the [`SerializationBatch`][SerializationBatch]. + /// Try to serialize a [`ZenohMessage`][ZenohMessage] on the [`WBatch`][WBatch]. /// /// # Arguments /// * `message` - The [`ZenohMessage`][ZenohMessage] to serialize. @@ -489,73 +481,173 @@ impl Encode<(&mut ZBufReader<'_>, FragmentHeader)> for &mut WBatch { } } -// // READ BATCH -// #[derive(Debug)] -// pub(crate) struct RBatch { -// // The buffer to perform deserializationn from -// buffer: Box<[u8]>, -// // It contains 2 bytes indicating how many bytes are in the batch -// has_length: bool, -// // It contains 1 byte as additional header, e.g. to signal the batch is compressed -// has_header: bool, -// } - -// impl RBatch { -// /// Verify that the [`SerializationBatch`][SerializationBatch] is for a stream-based protocol, i.e., the first -// /// 2 bytes are reserved to encode the total amount of serialized bytes as 16-bits little endian. -// #[inline(always)] -// pub(crate) const fn has_length(&self) -> bool { -// self.has_length -// } - -// /// Verify that the [`SerializationBatch`][SerializationBatch] is for a compression-enabled link, -// /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. - -// #[inline(always)] -// pub(crate) const fn has_header(&self) -> bool { -// self.has_header -// } - -// // Split (length, header, payload) internal buffer slice -// #[inline(always)] -// fn split(&self) -> (&[u8], &[u8], &[u8]) { -// zsplit!(self) -// } - -// // Split (length, header, payload) internal buffer slice -// #[inline(always)] -// fn split_mut(&mut self) -> (&mut [u8], &mut [u8], &mut [u8]) { -// zsplitmut!(self) -// } - -// pub(crate) async fn read_unicast(&mut self, link: &LinkUnicast) -> ZResult { -// let n = if self.has_length() { -// let mut length = [0_u8, 0_u8]; -// link.read_exact(&mut length).await?; -// let n = BatchSize::from_le_bytes(length) as usize; -// link.read_exact(&mut self.buffer[0..n]).await?; -// n -// } else { -// link.read(&mut self.buffer).await? -// }; - -// Ok(n) -// } - -// #[cfg(feature = "transport_compression")] -// pub(crate) fn uncompress_into(&mut self, batch: &mut WBatch) -> Result<(), DidntRead> { -// use zenoh_protocol::common::imsg; - -// self.clear(); -// let mut writer = self.buffer.writer(); -// // let (_length, header, payload) = self.split(); -// // if !header.is_empty() && imsg::has_flag(header[0], header::COMPRESSED) { -// // } else { -// // } - -// Ok(()) -// } -// } +// READ BATCH +pub trait Decode { + type Error; + fn decode(self, message: Message) -> Result; +} + +#[derive(Debug)] +pub struct RBatch { + // The buffer to perform deserializationn from + buffer: Box<[u8]>, + start: usize, + end: usize, + // It contains 2 bytes indicating how many bytes are in the batch + has_length: bool, + // It contains 1 byte as additional header, e.g. to signal the batch is compressed + has_header: bool, +} + +use std::fmt; +impl fmt::Display for RBatch { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("RBatch") + .field("has_length", &self.has_length) + .field("has_header", &self.has_header) + .field("capacity", &self.buffer.len()) + .field("start", &self.start) + .field("end", &self.end) + .finish() + } +} + +impl RBatch { + pub fn new( + size: BatchSize, + is_streamed: bool, + #[cfg(feature = "transport_compression")] is_compression: bool, + ) -> Self { + let has_header = zcondfeat!("transport_compression", is_compression, false); + Self { + buffer: zenoh_buffers::vec::uninit(size as usize).into_boxed_slice(), + start: 0, + end: 0, + has_length: is_streamed, + has_header, + } + } + + /// Verify that the [`WBatch`][WBatch] is for a compression-enabled link, + /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. + #[inline(always)] + pub fn set_streamed(mut self, v: bool) -> Self { + self.has_length = v; + self + } + + #[inline(always)] + pub const fn get_streamed(&self) -> bool { + self.has_length + } + + /// Verify that the [`WBatch`][WBatch] is for a compression-enabled link, + /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. + #[cfg(feature = "transport_compression")] + #[inline(always)] + pub fn set_compression(mut self, v: bool) -> Self { + self.has_header = v; + self + } + + #[cfg(feature = "transport_compression")] + #[inline(always)] + pub fn get_compression(&self) -> bool { + let (_l, h, _p) = self.split(); + !h.is_empty() && imsg::has_flag(h[0], header::COMPRESSION) + } + + /// Verify that the [`WBatch`][WBatch] is for a stream-based protocol, i.e., the first + /// 2 bytes are reserved to encode the total amount of serialized bytes as 16-bits little endian. + #[inline(always)] + pub const fn has_length(&self) -> bool { + self.has_length + } + + /// Verify that the [`WBatch`][WBatch] is for a compression-enabled link, + /// i.e., the third byte is used to signa encode the total amount of serialized bytes as 16-bits little endian. + #[inline(always)] + pub const fn has_header(&self) -> bool { + self.has_header + } + + pub fn finalize(&mut self) -> Result<(), DidntRead> { + #[cfg(feature = "transport_compression")] + if self.get_compression() { + self.uncompress()?; + } + Ok(()) + } + + pub(crate) async fn read_unicast(&mut self, link: &LinkUnicast) -> ZResult<()> { + if self.has_length() { + self.start = LENGTH_BYTES.len(); + // Read and decode the message length + let mut length_bytes = LENGTH_BYTES; + link.read_exact(&mut length_bytes).await?; + let n = BatchSize::from_le_bytes(length_bytes) as usize; + // Read the bytes + self.end = self.start + n; + link.read_exact(&mut self.buffer[self.start..self.end]) + .await?; + } else { + self.start = 0; + // Read the bytes + let n = link.read(&mut self.buffer).await?; + self.end = n; + } + + // Advance the start index of the payload in case of header + if self.has_header() { + self.start += HEADER_BYTES.len(); + } + + Ok(()) + } + + pub(crate) async fn read_multicast(&mut self, link: &LinkMulticast) -> ZResult { + self.start = 0; + // Read the bytes + let (n, locator) = link.read(&mut self.buffer).await?; + self.end = n; + // Advance the start index of the payload in case of header + if self.has_header() { + self.start += HEADER_BYTES.len(); + } + + Ok(locator.into_owned()) + } + + // Split (length, header, payload) internal buffer slice + #[inline(always)] + fn split(&self) -> (&[u8], &[u8], &[u8]) { + zsplit!(self.buffer.as_slice(), self.has_length(), self.has_header()) + } + + #[cfg(feature = "transport_compression")] + fn uncompress(&mut self) -> Result<(), DidntRead> { + // Create a new empty buffer + let mut buffer = zenoh_buffers::vec::uninit(self.buffer.len()).into_boxed_slice(); + + let (_l, _h, p) = self.split(); + let n = lz4_flex::block::decompress_into(p, &mut buffer).map_err(|_| DidntRead)?; + + self.buffer = buffer; + self.start = 0; + self.end = n; + + Ok(()) + } +} + +impl From for ZSlice { + fn from(batch: RBatch) -> Self { + // SAFETY: `ZSlice::make` could fail if `start` and `end` indexes are out-of-bound. + // In this case, we are passing the `buffer`, `start`, and `end` values as + // being populated by `read_unicast()` and `read_multicast`. + unsafe { ZSlice::make(Arc::new(batch.buffer), batch.start, batch.end).unwrap_unchecked() } + } +} #[cfg(test)] mod tests { @@ -573,7 +665,12 @@ mod tests { #[test] fn serialization_batch() { - let mut batch = WBatch::new(BatchSize::MAX); + let mut batch = WBatch::new( + BatchSize::MAX, + false, + #[cfg(feature = "transport_compression")] + false, + ); let tmsg: TransportMessage = KeepAlive.into(); let nmsg: NetworkMessage = Push { @@ -632,3 +729,104 @@ mod tests { nmsgs_in.push(nmsg.clone()); } } + +#[cfg(all(feature = "transport_compression", feature = "unstable"))] +#[test] +fn tx_compression_test() { + const COMPRESSION_BYTE: usize = 1; + let payload = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; + let mut buff: Box<[u8]> = + vec![0; lz4_flex::block::get_maximum_output_size(MAX_BATCH_SIZE) + 3].into_boxed_slice(); + + // Compression done for the sake of comparing the result. + let payload_compression_size = lz4_flex::block::compress_into(&payload, &mut buff).unwrap(); + + fn get_header_value(buff: &[u8]) -> u16 { + let mut header = [0_u8, 0_u8]; + header[..HEADER_BYTES_SIZE].copy_from_slice(&buff[..HEADER_BYTES_SIZE]); + u16::from_le_bytes(header) + } + + // Streamed with compression enabled + let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; + let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap(); + let header = get_header_value(&buff); + assert!(was_compressed); + assert_eq!(header as usize, payload_compression_size + COMPRESSION_BYTE); + assert!(batch_size < batch.len() + COMPRESSION_BYTE); + assert_eq!(batch_size, payload_compression_size + 3); + + // Not streamed with compression enabled + let batch = payload; + let (batch_size, was_compressed) = tx_compressed(true, false, &batch, &mut buff).unwrap(); + assert!(was_compressed); + assert!(batch_size < batch.len() + COMPRESSION_BYTE); + assert_eq!(batch_size, payload_compression_size + COMPRESSION_BYTE); + + // Streamed with compression disabled + let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; + let (batch_size, was_compressed) = tx_compressed(false, true, &batch, &mut buff).unwrap(); + let header = get_header_value(&buff); + assert!(!was_compressed); + assert_eq!(header as usize, payload.len() + COMPRESSION_BYTE); + assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE); + + // Not streamed and compression disabled + let batch = payload; + let (batch_size, was_compressed) = tx_compressed(false, false, &batch, &mut buff).unwrap(); + assert!(!was_compressed); + assert_eq!(batch_size, payload.len() + COMPRESSION_BYTE); + + // Verify that if the compression result is bigger than the original payload size, then the non compressed payload is returned. + let batch = [16, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; // a non compressable payload with no repetitions + let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap(); + assert!(!was_compressed); + assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE); +} + +#[cfg(all(feature = "transport_compression", feature = "unstable"))] +#[test] +fn rx_compression_test() { + let pool = RecyclingObjectPool::new(2, || vec![0_u8; MAX_BATCH_SIZE].into_boxed_slice()); + let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); + + // Compressed batch + let payload: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8]; + let compression_size = lz4_flex::block::compress_into(&payload, &mut buffer[1..]).unwrap(); + buffer[0] = 1; // is compressed byte + + let mut start_pos: usize = 0; + let mut end_pos: usize = 0; + + rx_decompress( + &mut buffer, + &pool, + compression_size + 1, + &mut start_pos, + &mut end_pos, + ) + .unwrap(); + + assert_eq!(start_pos, 0); + assert_eq!(end_pos, payload.len()); + assert_eq!(buffer[start_pos..end_pos], payload); + + // Non compressed batch + let mut start_pos: usize = 0; + let mut end_pos: usize = 0; + + buffer[0] = 0; + buffer[1..payload.len() + 1].copy_from_slice(&payload[..]); + rx_decompress( + &mut buffer, + &pool, + payload.len() + 1, + &mut start_pos, + &mut end_pos, + ) + .unwrap(); + + assert_eq!(start_pos, 1); + assert_eq!(end_pos, payload.len() + 1); + assert_eq!(buffer[start_pos..end_pos], payload); +} diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 29c616abe8..4df22c4c03 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -11,8 +11,10 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::batch::{Encode, WBatch, WError}; -use super::priority::{TransportChannelTx, TransportPriorityTx}; +use super::{ + batch::{Encode, WBatch, WError}, + priority::{TransportChannelTx, TransportPriorityTx}, +}; use async_std::prelude::FutureExt; use flume::{bounded, Receiver, Sender}; use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; @@ -378,8 +380,7 @@ struct StageOutIn { impl StageOutIn { #[inline] fn try_pull(&mut self) -> Pull { - if let Some(mut batch) = self.s_out_r.pull() { - batch.finalize(); + if let Some(batch) = self.s_out_r.pull() { self.backoff.stop(); return Pull::Some(batch); } @@ -397,16 +398,14 @@ impl StageOutIn { // No new bytes have been written on the batch, try to pull if let Ok(mut g) = self.current.try_lock() { // First try to pull from stage OUT - if let Some(mut batch) = self.s_out_r.pull() { - batch.finalize(); + if let Some(batch) = self.s_out_r.pull() { self.backoff.stop(); return Pull::Some(batch); } // An incomplete (non-empty) batch is available in the state IN pipeline. match g.take() { - Some(mut batch) => { - batch.finalize(); + Some(batch) => { self.backoff.stop(); return Pull::Some(batch); } @@ -420,8 +419,7 @@ impl StageOutIn { } std::cmp::Ordering::Less => { // There should be a new batch in Stage OUT - if let Some(mut batch) = self.s_out_r.pull() { - batch.finalize(); + if let Some(batch) = self.s_out_r.pull() { self.backoff.stop(); return Pull::Some(batch); } @@ -469,8 +467,7 @@ impl StageOut { fn drain(&mut self, guard: &mut MutexGuard<'_, Option>) -> Vec { let mut batches = vec![]; // Empty the ring buffer - while let Some(mut batch) = self.s_in.s_out_r.pull() { - batch.finalize(); + while let Some(batch) = self.s_in.s_out_r.pull() { batches.push(batch); } // Take the current batch @@ -534,11 +531,12 @@ impl TransmissionPipeline { let (mut s_ref_w, s_ref_r) = RingBuffer::::init(); // Fill the refill ring buffer with batches for _ in 0..*num { - let mut batch = WBatch::new(config.batch_size).set_streamed(config.is_streamed); - #[cfg(feature = "transport_compression")] - { - batch = batch.set_compression(config.is_compression); - } + let batch = WBatch::new( + config.batch_size, + config.is_streamed, + #[cfg(feature = "transport_compression")] + false, + ); assert!(s_ref_w.push(batch).is_none()); } // Create the channel for notifying that new batches are in the refill ring buffer diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 11b86a9e25..f00afa0c1c 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -19,7 +19,7 @@ use crate::{ close_link, compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, Zenoh080Cookie, }, - TransportLinkUnicastConfig, TransportLinkUnicastDirection, + link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection}, }, TransportConfigUnicast, TransportManager, }; @@ -115,7 +115,7 @@ struct SendOpenAckOut { // Fsm struct AcceptLink<'a> { - link: &'a LinkUnicast, + link: &'a TransportLinkUnicast, prng: &'a Mutex, cipher: &'a BlockCipher, ext_qos: ext::qos::QoSFsm<'a>, @@ -582,8 +582,16 @@ impl<'a> AcceptFsm for AcceptLink<'a> { } pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> { + let link = TransportLinkUnicast::new( + link.clone(), + TransportLinkUnicastConfig { + direction: TransportLinkUnicastDirection::Inbound, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, + ); let fsm = AcceptLink { - link, + link: &link, prng: &manager.prng, cipher: &manager.cipher, ext_qos: ext::qos::QoSFsm::new(), @@ -691,17 +699,12 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) is_shm: state.ext_shm.is_shm(), is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), }; - - let link_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, - #[cfg(feature = "transport_compression")] - is_compression: state.link.ext_compression.is_compression(), - }; - let transport = step!( - manager - .init_transport_unicast(config, link.clone(), link_config) - .await - ); + let mut c_link = link.clone(); + #[cfg(feature = "transport_compression")] + { + c_link.config.is_compression = state.link.ext_compression.is_compression(); + } + let transport = step!(manager.init_transport_unicast(config, c_link).await); // Send the open_ack on the link step!(link @@ -722,7 +725,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) other_lease: osyn_out.other_lease, agreed_batch_size: state.transport.batch_size, }; - step!(finalize_transport(link, manager, input) + step!(finalize_transport(&link, manager, input) .await .map_err(|e| (e, Some(close::reason::INVALID)))); diff --git a/io/zenoh-transport/src/unicast/establishment/mod.rs b/io/zenoh-transport/src/unicast/establishment/mod.rs index 6bc8c898e8..5e39d8c49f 100644 --- a/io/zenoh-transport/src/unicast/establishment/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/mod.rs @@ -17,7 +17,7 @@ pub mod ext; pub(crate) mod open; use super::{TransportPeer, TransportUnicast}; -use crate::{common::seq_num, TransportManager}; +use crate::{common::seq_num, unicast::link::TransportLinkUnicast, TransportManager}; use async_trait::async_trait; use cookie::*; use sha3::{ @@ -25,7 +25,7 @@ use sha3::{ Shake128, }; use std::time::Duration; -use zenoh_link::{Link, LinkUnicast}; +use zenoh_link::Link; use zenoh_protocol::{ core::{Field, Resolution, ZenohId}, transport::{BatchSize, Close, TransportMessage, TransportSn}, @@ -116,7 +116,7 @@ pub(super) fn compute_sn(zid1: ZenohId, zid2: ZenohId, resolution: Resolution) - TransportSn::from_le_bytes(array) & seq_num::get_mask(resolution.get(Field::FrameSN)) } -pub(super) async fn close_link(link: &LinkUnicast, reason: Option) { +pub(super) async fn close_link(link: TransportLinkUnicast, reason: Option) { if let Some(reason) = reason { // Build the close message let message: TransportMessage = Close { @@ -139,7 +139,7 @@ pub(super) struct InputFinalize { } // Finalize the transport, notify the callback and start the link tasks pub(super) async fn finalize_transport( - link: &LinkUnicast, + link: &TransportLinkUnicast, manager: &TransportManager, input: self::InputFinalize, ) -> ZResult<()> { diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index a87ae69c6b..cb2b62953c 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -16,7 +16,7 @@ use crate::unicast::shared_memory_unicast::Challenge; use crate::{ unicast::{ establishment::{close_link, compute_sn, ext, finalize_transport, InputFinalize, OpenFsm}, - TransportLinkUnicastConfig, TransportLinkUnicastDirection, + link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection}, }, TransportConfigUnicast, TransportManager, TransportUnicast, }; @@ -99,7 +99,7 @@ struct RecvOpenAckOut { // FSM struct OpenLink<'a> { - link: &'a LinkUnicast, + link: &'a TransportLinkUnicast, ext_qos: ext::qos::QoSFsm<'a>, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::MultiLinkFsm<'a>, @@ -506,11 +506,19 @@ impl<'a> OpenFsm for OpenLink<'a> { } pub(crate) async fn open_link( - link: &LinkUnicast, + link: LinkUnicast, manager: &TransportManager, ) -> ZResult { - let fsm = OpenLink { + let mut link = TransportLinkUnicast::new( link, + TransportLinkUnicastConfig { + direction: TransportLinkUnicastDirection::Outbound, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, + ); + let fsm = OpenLink { + link: &link, ext_qos: ext::qos::QoSFsm::new(), #[cfg(feature = "transport_multilink")] ext_mlink: manager.state.unicast.multilink.fsm(&manager.prng), @@ -601,16 +609,12 @@ pub(crate) async fn open_link( is_shm: state.ext_shm.is_shm(), is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), }; - let link_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, - #[cfg(feature = "transport_compression")] - is_compression: state.link.ext_compression.is_compression(), - }; - let transport = step!( - manager - .init_transport_unicast(config, link.clone(), link_config) - .await - ); + #[cfg(feature = "transport_compression")] + { + link.config.is_compression = state.link.ext_compression.is_compression(); + } + let c_link = link.clone(); + let transport = step!(manager.init_transport_unicast(config, c_link).await); // Sync the RX sequence number let _ = step!(transport @@ -625,7 +629,7 @@ pub(crate) async fn open_link( agreed_batch_size: state.transport.batch_size, }; let transport = output.transport.clone(); - let res = finalize_transport(link, manager, output).await; + let res = finalize_transport(&link, manager, output).await; if let Err(e) = res { let _ = transport.close().await; return Err(e); diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index f9f949bf99..22b2fa8f8a 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -14,16 +14,14 @@ use super::transport::TransportUnicastLowlatency; #[cfg(feature = "stats")] use crate::stats::TransportStats; -use crate::TransportExecutor; +use crate::{unicast::link::TransportLinkUnicast, TransportExecutor}; use async_std::task; use async_std::{prelude::FutureExt, sync::RwLock}; -use zenoh_codec::*; -use zenoh_core::{zasyncread, zasyncwrite}; - use std::sync::Arc; use std::time::Duration; use zenoh_buffers::{writer::HasWriter, ZSlice}; -use zenoh_link::LinkUnicast; +use zenoh_codec::*; +use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_protocol::transport::{ BatchSize, KeepAlive, TransportBodyLowLatency, TransportMessageLowLatency, }; @@ -31,7 +29,7 @@ use zenoh_result::{zerror, ZResult}; use zenoh_sync::RecyclingObjectPool; pub(crate) async fn send_with_link( - link: &LinkUnicast, + link: &TransportLinkUnicast, msg: TransportMessageLowLatency, #[cfg(feature = "stats")] stats: &Arc, ) -> ZResult<()> { @@ -177,7 +175,7 @@ impl TransportUnicastLowlatency { /* TASKS */ /*************************************/ async fn keepalive_task( - link: Arc>, + link: Arc>, keep_alive: Duration, #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { @@ -201,13 +199,13 @@ async fn keepalive_task( } async fn rx_task_stream( - link: LinkUnicast, + link: TransportLinkUnicast, transport: TransportUnicastLowlatency, lease: Duration, rx_batch_size: BatchSize, rx_buffer_size: usize, ) -> ZResult<()> { - async fn read(link: &LinkUnicast, buffer: &mut [u8]) -> ZResult { + async fn read(link: &TransportLinkUnicast, buffer: &mut [u8]) -> ZResult { // 16 bits for reading the batch length let mut length = [0_u8, 0_u8, 0_u8, 0_u8]; link.read_exact(&mut length).await?; @@ -244,7 +242,7 @@ async fn rx_task_stream( } async fn rx_task_dgram( - link: LinkUnicast, + link: TransportLinkUnicast, transport: TransportUnicastLowlatency, lease: Duration, rx_batch_size: BatchSize, @@ -278,13 +276,13 @@ async fn rx_task_dgram( } async fn rx_task( - link: LinkUnicast, + link: TransportLinkUnicast, transport: TransportUnicastLowlatency, lease: Duration, rx_batch_size: u16, rx_buffer_size: usize, ) -> ZResult<()> { - if link.is_streamed() { + if link.link.is_streamed() { rx_task_stream(link, transport, lease, rx_batch_size, rx_buffer_size).await } else { rx_task_dgram(link, transport, lease, rx_batch_size, rx_buffer_size).await diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index e20b42e899..53ed01c9e8 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -16,8 +16,8 @@ use super::link::send_with_link; #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::transport_unicast_inner::TransportUnicastTrait; +use crate::unicast::link::{TransportLinkUnicast, TransportLinkUnicastConfig}; use crate::TransportConfigUnicast; -use crate::TransportLinkUnicastConfig; use crate::TransportManager; use crate::{TransportExecutor, TransportPeerEventHandler}; use async_executor::Task; @@ -55,7 +55,7 @@ pub(crate) struct TransportUnicastLowlatency { // Transport config pub(super) config: TransportConfigUnicast, // The link associated to the transport - pub(super) link: Arc>, + pub(super) link: Arc>, // The callback pub(super) callback: Arc>>>, // Mutex for notification @@ -73,7 +73,7 @@ impl TransportUnicastLowlatency { pub fn make( manager: TransportManager, config: TransportConfigUnicast, - link: LinkUnicast, + link: TransportLinkUnicast, ) -> ZResult { #[cfg(feature = "stats")] let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); @@ -163,7 +163,7 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { zasynclock!(self.alive) } - fn get_links(&self) -> Vec { + fn get_links(&self) -> Vec { let guard = async_std::task::block_on(async { zasyncread!(self.link) }); [guard.clone()].to_vec() } @@ -207,7 +207,7 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { fn start_tx( &self, - _link: &LinkUnicast, + _link: &TransportLinkUnicast, executor: &TransportExecutor, keep_alive: Duration, _batch_size: u16, @@ -216,7 +216,12 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { Ok(()) } - fn start_rx(&self, _link: &LinkUnicast, lease: Duration, batch_size: u16) -> ZResult<()> { + fn start_rx( + &self, + _link: &TransportLinkUnicast, + lease: Duration, + batch_size: u16, + ) -> ZResult<()> { self.internal_start_rx(lease, batch_size); Ok(()) } @@ -224,11 +229,7 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { /*************************************/ /* LINK */ /*************************************/ - async fn add_link( - &self, - link: LinkUnicast, - _config: TransportLinkUnicastConfig, - ) -> ZResult<()> { + async fn add_link(&self, link: TransportLinkUnicast) -> ZResult<()> { log::trace!("Adding link: {}", link); #[cfg(not(feature = "transport_unixpipe"))] @@ -313,7 +314,7 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { /*************************************/ /* TERMINATION */ /*************************************/ - async fn close_link(&self, link: &LinkUnicast, reason: u8) -> ZResult<()> { + async fn close_link(&self, link: &TransportLinkUnicast, reason: u8) -> ZResult<()> { log::trace!("Closing link {} with peer: {}", link, self.config.zid); self.finalize(reason).await } diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index 0bd2b9f64a..8624c372e9 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -18,9 +18,9 @@ use crate::unicast::establishment::ext::auth::Auth; #[cfg(feature = "transport_multilink")] use crate::unicast::establishment::ext::multilink::MultiLink; use crate::{ - lowlatency::transport::TransportUnicastLowlatency, + // lowlatency::transport::TransportUnicastLowlatency, transport_unicast_inner::TransportUnicastTrait, - unicast::{TransportConfigUnicast, TransportLinkUnicastConfig, TransportUnicast}, + unicast::{link::TransportLinkUnicast, TransportConfigUnicast, TransportUnicast}, universal::transport::TransportUnicastUniversal, TransportManager, }; @@ -406,8 +406,7 @@ impl TransportManager { pub(super) async fn init_transport_unicast( &self, config: TransportConfigUnicast, - link: LinkUnicast, - link_config: TransportLinkUnicastConfig, + link: TransportLinkUnicast, ) -> Result)> { let mut guard = zasynclock!(self.state.unicast.transports); @@ -430,7 +429,7 @@ impl TransportManager { // Add the link to the transport transport - .add_link(link, link_config) + .add_link(link) .await .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; @@ -456,9 +455,10 @@ impl TransportManager { let a_t = { if config.is_lowlatency { log::debug!("Will use LowLatency transport!"); - TransportUnicastLowlatency::make(self.clone(), config.clone(), link) - .map_err(|e| (e, Some(close::reason::INVALID))) - .map(|v| Arc::new(v) as Arc)? + // TransportUnicastLowlatency::make(self.clone(), config.clone(), link) + // .map_err(|e| (e, Some(close::reason::INVALID))) + // .map(|v| Arc::new(v) as Arc)? + panic!(); // @TODO } else { log::debug!("Will use Universal transport!"); let t: Arc = @@ -466,7 +466,7 @@ impl TransportManager { .map_err(|e| (e, Some(close::reason::INVALID))) .map(|v| Arc::new(v) as Arc)?; // Add the link to the transport - t.add_link(link, link_config) + t.add_link(link) .await .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; t @@ -542,7 +542,7 @@ impl TransportManager { // Create a new link associated by calling the Link Manager let link = manager.new_link(endpoint).await?; // Open the link - super::establishment::open::open_link(&link, self).await + super::establishment::open::open_link(link, self).await } pub async fn get_transport_unicast(&self, peer: &ZenohId) -> Option { diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index d96afeca76..578d4f1e02 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -12,7 +12,8 @@ // ZettaScale Zenoh Team, // pub mod establishment; -pub(crate) mod lowlatency; +pub(crate) mod link; +// pub(crate) mod lowlatency; pub(crate) mod manager; pub(crate) mod transport_unicast_inner; pub(crate) mod universal; @@ -40,24 +41,6 @@ use zenoh_protocol::{ }; use zenoh_result::{zerror, ZResult}; -/*************************************/ -/* TRANSPORT UNICAST LINK */ -/*************************************/ -#[derive(Clone, Copy, PartialEq, Eq)] -pub(crate) enum TransportLinkUnicastDirection { - Inbound, - Outbound, -} - -#[derive(Clone, Copy)] -pub(crate) struct TransportLinkUnicastConfig { - // Inbound / outbound - pub(super) direction: TransportLinkUnicastDirection, - // Compression is active on the link - #[cfg(feature = "transport_compression")] - is_compression: bool, -} - /*************************************/ /* TRANSPORT UNICAST */ /*************************************/ @@ -152,7 +135,7 @@ impl TransportUnicast { let link = transport .get_links() .into_iter() - .find(|l| l.get_src() == &link.src && l.get_dst() == &link.dst) + .find(|l| l.link.get_src() == &link.src && l.link.get_dst() == &link.dst) .ok_or_else(|| zerror!("Invalid link"))?; transport.close_link(&link, close::reason::GENERIC).await?; Ok(()) diff --git a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs index bdefe69e89..7de9983cb4 100644 --- a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs +++ b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs @@ -13,13 +13,12 @@ // use crate::{ - TransportConfigUnicast, TransportExecutor, TransportLinkUnicastConfig, - TransportPeerEventHandler, + unicast::{link::TransportLinkUnicast, TransportConfigUnicast}, + TransportExecutor, TransportPeerEventHandler, }; use async_std::sync::MutexGuard as AsyncMutexGuard; use async_trait::async_trait; use std::{fmt::DebugStruct, sync::Arc, time::Duration}; -use zenoh_link::LinkUnicast; use zenoh_protocol::{ core::{WhatAmI, ZenohId}, network::NetworkMessage, @@ -40,7 +39,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { fn get_zid(&self) -> ZenohId; fn get_whatami(&self) -> WhatAmI; fn get_callback(&self) -> Option>; - fn get_links(&self) -> Vec; + fn get_links(&self) -> Vec; #[cfg(feature = "shared-memory")] fn is_shm(&self) -> bool; fn is_qos(&self) -> bool; @@ -51,7 +50,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { /*************************************/ /* LINK */ /*************************************/ - async fn add_link(&self, link: LinkUnicast, config: TransportLinkUnicastConfig) -> ZResult<()>; + async fn add_link(&self, link: TransportLinkUnicast) -> ZResult<()>; /*************************************/ /* TX */ @@ -59,7 +58,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { fn schedule(&self, msg: NetworkMessage) -> ZResult<()>; fn start_tx( &self, - link: &LinkUnicast, + link: &TransportLinkUnicast, executor: &TransportExecutor, keep_alive: Duration, batch_size: u16, @@ -68,7 +67,12 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { /*************************************/ /* RX */ /*************************************/ - fn start_rx(&self, link: &LinkUnicast, lease: Duration, batch_size: u16) -> ZResult<()>; + fn start_rx( + &self, + link: &TransportLinkUnicast, + lease: Duration, + batch_size: u16, + ) -> ZResult<()>; /*************************************/ /* INITIATION */ @@ -78,7 +82,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { /*************************************/ /* TERMINATION */ /*************************************/ - async fn close_link(&self, link: &LinkUnicast, reason: u8) -> ZResult<()>; + async fn close_link(&self, link: &TransportLinkUnicast, reason: u8) -> ZResult<()>; async fn close(&self, reason: u8) -> ZResult<()>; fn add_debug_fields<'a, 'b: 'a, 'c>( diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index e7150cff4b..8317816767 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -16,58 +16,31 @@ use super::transport::TransportUnicastUniversal; use crate::common::stats::TransportStats; use crate::{ common::{ + batch::RBatch, pipeline::{ TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, TransmissionPipelineProducer, }, priority::TransportPriorityTx, }, - unicast::TransportLinkUnicastConfig, + unicast::link::TransportLinkUnicast, TransportExecutor, }; use async_std::prelude::FutureExt; use async_std::task; use async_std::task::JoinHandle; - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -use std::convert::TryInto; -use std::sync::Arc; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use zenoh_buffers::ZSlice; -use zenoh_link::LinkUnicast; use zenoh_protocol::transport::{BatchSize, KeepAlive, TransportMessage}; use zenoh_result::{bail, zerror, ZResult}; use zenoh_sync::{RecyclingObjectPool, Signal}; -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -const HEADER_BYTES_SIZE: usize = 2; - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -const COMPRESSION_BYTE_INDEX_STREAMED: usize = 2; - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -const COMPRESSION_BYTE_INDEX: usize = 0; - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -const COMPRESSION_ENABLED: u8 = 1_u8; - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -const COMPRESSION_DISABLED: u8 = 0_u8; - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -const BATCH_PAYLOAD_START_INDEX: usize = 1; - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -const MAX_BATCH_SIZE: usize = u16::MAX as usize; - #[derive(Clone)] -pub(super) struct TransportLinkUnicast { +pub(super) struct TransportLinkUnicastUniversal { // The underlying link - pub(super) link: LinkUnicast, + pub(super) link: TransportLinkUnicast, // The transmission pipeline pub(super) pipeline: Option, - // The config - pub(super) config: TransportLinkUnicastConfig, // The transport this link is associated to transport: TransportUnicastUniversal, // The signals to stop TX/RX tasks @@ -76,16 +49,11 @@ pub(super) struct TransportLinkUnicast { handle_rx: Option>>, } -impl TransportLinkUnicast { - pub(super) fn new( - transport: TransportUnicastUniversal, - link: LinkUnicast, - config: TransportLinkUnicastConfig, - ) -> TransportLinkUnicast { - TransportLinkUnicast { +impl TransportLinkUnicastUniversal { + pub(super) fn new(transport: TransportUnicastUniversal, link: TransportLinkUnicast) -> Self { + Self { link, pipeline: None, - config, transport, handle_tx: None, signal_rx: Signal::new(), @@ -94,7 +62,7 @@ impl TransportLinkUnicast { } } -impl TransportLinkUnicast { +impl TransportLinkUnicastUniversal { pub(super) fn start_tx( &mut self, executor: &TransportExecutor, @@ -104,10 +72,10 @@ impl TransportLinkUnicast { ) { if self.handle_tx.is_none() { let config = TransmissionPipelineConf { - is_streamed: self.link.is_streamed(), + is_streamed: self.link.link.is_streamed(), #[cfg(feature = "transport_compression")] - is_compression: self.config.is_compression, - batch_size: batch_size.min(self.link.get_mtu()), + is_compression: self.link.config.is_compression, + batch_size: batch_size.min(self.link.link.get_mtu()), queue_size: self.transport.manager.config.queue_size, backoff: self.transport.manager.config.queue_backoff, }; @@ -210,7 +178,7 @@ impl TransportLinkUnicast { /*************************************/ async fn tx_task( mut pipeline: TransmissionPipelineConsumer, - link: LinkUnicast, + link: TransportLinkUnicast, keep_alive: Duration, #[cfg(feature = "stats")] stats: Arc, #[cfg(all(feature = "unstable", feature = "transport_compression"))] is_compressed: bool, @@ -222,23 +190,8 @@ async fn tx_task( loop { match pipeline.pull().timeout(keep_alive).await { Ok(res) => match res { - Some((batch, priority)) => { - // Send the buffer on the link - #[allow(unused_mut)] - let mut bytes = batch.as_slice(); - - #[cfg(all(feature = "unstable", feature = "transport_compression"))] - { - let (batch_size, _) = tx_compressed( - is_compressed, - link.is_streamed(), - bytes, - &mut compression_aux_buff, - )?; - bytes = &compression_aux_buff[..batch_size]; - } - - link.write_all(bytes).await?; + Some((mut batch, priority)) => { + link.send_batch(&mut batch).await?; #[cfg(feature = "stats")] { @@ -267,8 +220,8 @@ async fn tx_task( // Drain the transmission pipeline and write remaining bytes on the wire let mut batches = pipeline.drain(); - for (b, _) in batches.drain(..) { - link.write_all(b.as_slice()) + for (mut b, _) in batches.drain(..) { + link.send_batch(&mut b) .timeout(keep_alive) .await .map_err(|_| zerror!("{}: flush failed after {} ms", link, keep_alive.as_millis()))??; @@ -283,8 +236,8 @@ async fn tx_task( Ok(()) } -async fn rx_task_stream( - link: LinkUnicast, +async fn rx_task( + link: TransportLinkUnicast, transport: TransportUnicastUniversal, lease: Duration, signal: Signal, @@ -292,17 +245,14 @@ async fn rx_task_stream( rx_buffer_size: usize, ) -> ZResult<()> { enum Action { - Read(usize), + Read(ZSlice), Stop, } - async fn read(link: &LinkUnicast, buffer: &mut [u8]) -> ZResult { - // 16 bits for reading the batch length - let mut length = [0_u8, 0_u8]; - link.read_exact(&mut length).await?; - let n = BatchSize::from_le_bytes(length) as usize; - link.read_exact(&mut buffer[0..n]).await?; - Ok(Action::Read(n)) + async fn read(link: &TransportLinkUnicast, mut batch: RBatch) -> ZResult { + link.recv_batch(&mut batch).await?; + let zslice: ZSlice = batch.into(); + Ok(Action::Read(zslice)) } async fn stop(signal: Signal) -> ZResult { @@ -311,401 +261,41 @@ async fn rx_task_stream( } // The pool of buffers - let mtu = link.get_mtu().min(rx_batch_size) as usize; + let mtu = link.link.get_mtu().min(rx_batch_size) as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; } - let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); - while !signal.is_triggered() { - // Retrieve one buffer - let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); - // Async read from the underlying link - let action = read(&link, &mut buffer) - .race(stop(signal.clone())) - .timeout(lease) - .await - .map_err(|_| zerror!("{}: expired after {} milliseconds", link, lease.as_millis()))??; - match action { - Action::Read(n) => { - #[cfg(feature = "stats")] - { - transport.stats.inc_rx_bytes(2 + n); // Account for the batch len encoding (16 bits) - } - - #[allow(unused_mut)] - let mut end_pos = n; - - #[allow(unused_mut)] - let mut start_pos = 0; - - #[cfg(all(feature = "unstable", feature = "transport_compression"))] - rx_decompress(&mut buffer, &pool, n, &mut start_pos, &mut end_pos)?; - - // Deserialize all the messages from the current ZBuf - let zslice = ZSlice::make(Arc::new(buffer), start_pos, end_pos) - .map_err(|_| zerror!("Read {} bytes but buffer is {} bytes", n, mtu))?; - transport.read_messages(zslice, &link)?; - } - Action::Stop => break, - } - } - Ok(()) -} - -async fn rx_task_dgram( - link: LinkUnicast, - transport: TransportUnicastUniversal, - lease: Duration, - signal: Signal, - rx_batch_size: BatchSize, - rx_buffer_size: usize, -) -> ZResult<()> { - enum Action { - Read(usize), - Stop, - } - - async fn read(link: &LinkUnicast, buffer: &mut [u8]) -> ZResult { - let n = link.read(buffer).await?; - Ok(Action::Read(n)) - } - - async fn stop(signal: Signal) -> ZResult { - signal.wait().await; - Ok(Action::Stop) - } - - // The pool of buffers - let mtu = link.get_mtu().min(rx_batch_size) as usize; - let mut n = rx_buffer_size / mtu; - if rx_buffer_size % mtu != 0 { - n += 1; - } let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); while !signal.is_triggered() { // Retrieve one buffer let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); + // Retrieve one buffer + let batch = RBatch::new( + rx_batch_size, + link.link.is_streamed(), + #[cfg(feature = "transport_compression")] + link.config.is_compression, + ); // Async read from the underlying link - let action = read(&link, &mut buffer) + let action = read(&link, batch) .race(stop(signal.clone())) .timeout(lease) .await .map_err(|_| zerror!("{}: expired after {} milliseconds", link, lease.as_millis()))??; match action { - Action::Read(n) => { - if n == 0 { - // Reading 0 bytes means error - bail!("{}: zero bytes reading", link) - } - + Action::Read(zslice) => { #[cfg(feature = "stats")] { - transport.stats.inc_rx_bytes(n); + transport.stats.inc_rx_bytes(2 + n); // Account for the batch len encoding (16 bits) } - - #[allow(unused_mut)] - let mut end_pos = n; - - #[allow(unused_mut)] - let mut start_pos = 0; - - #[cfg(all(feature = "unstable", feature = "transport_compression"))] - rx_decompress(&mut buffer, &pool, n, &mut start_pos, &mut end_pos)?; - - // Deserialize all the messages from the current ZBuf - let zslice = ZSlice::make(Arc::new(buffer), start_pos, end_pos) - .map_err(|_| zerror!("Read {} bytes but buffer is {} bytes", n, mtu))?; transport.read_messages(zslice, &link)?; } Action::Stop => break, } } - Ok(()) -} - -async fn rx_task( - link: LinkUnicast, - transport: TransportUnicastUniversal, - lease: Duration, - signal: Signal, - rx_batch_size: u16, - rx_buffer_size: usize, -) -> ZResult<()> { - if link.is_streamed() { - rx_task_stream( - link, - transport, - lease, - signal, - rx_batch_size, - rx_buffer_size, - ) - .await - } else { - rx_task_dgram( - link, - transport, - lease, - signal, - rx_batch_size, - rx_buffer_size, - ) - .await - } -} -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -/// Decompresses the received contents contained in the buffer. -fn rx_decompress( - buffer: &mut zenoh_sync::RecyclingObject>, - pool: &RecyclingObjectPool, impl Fn() -> Box<[u8]>>, - read_bytes: usize, - start_pos: &mut usize, - end_pos: &mut usize, -) -> ZResult<()> { - let is_compressed: bool = buffer[COMPRESSION_BYTE_INDEX] == COMPRESSION_ENABLED; - if is_compressed { - let mut aux_buff = pool.try_take().unwrap_or_else(|| pool.alloc()); - *end_pos = lz4_flex::block::decompress_into( - &buffer[BATCH_PAYLOAD_START_INDEX..read_bytes], - &mut aux_buff, - ) - .map_err(|e| zerror!("Decompression error: {:}", e))?; - *buffer = aux_buff; - } else { - *start_pos = BATCH_PAYLOAD_START_INDEX; - *end_pos = read_bytes; - } Ok(()) } - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -/// Compresses the batch into the output buffer. -/// -/// If the batch is streamed, the output contains a header of two bytes representing the size of -/// the resulting batch, otherwise it is not included. In any case, an extra byte is added (before -/// the payload and considered in the header value) representing if the batch is compressed or not. -/// If the resulting size of the compression no smaller than the original batch size, then -/// we send the original one. -/// -/// Returns a tuple containing the size of the resulting batch, along with a boolean representing -/// if the batch was indeed compressed or not. -fn tx_compressed( - is_compressed: bool, - is_streamed: bool, - batch: &[u8], - output: &mut [u8], -) -> ZResult<(/*batch_size=*/ usize, /*was_compressed=*/ bool)> { - if is_compressed { - let s_pos = if is_streamed { 3 } else { 1 }; - let payload = &batch[s_pos - 1..]; - let payload_size = payload.len(); - let compression_size = lz4_flex::block::compress_into(payload, &mut output[s_pos..]) - .map_err(|e| zerror!("Compression error: {:}", e))?; - if compression_size >= payload_size { - log::debug!( - "Compression discarded due to the original batch size being smaller than the compressed batch." - ); - return Ok(( - set_uncompressed_batch_header(batch, output, is_streamed)?, - false, - )); - } - Ok(( - set_compressed_batch_header(output, compression_size, is_streamed)?, - true, - )) - } else { - Ok(( - set_uncompressed_batch_header(batch, output, is_streamed)?, - false, - )) - } -} - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -/// Inserts the compresion byte for batches WITH compression. -/// The buffer is expected to contain the compression starting from byte 3 (if streamed) or 1 -/// (if not streamed). -/// -/// Arguments: -/// - buff: the buffer with the compression, with 3 or 1 bytes reserved at the beginning in case of -/// being streamed or not respectively. -/// - compression_size: the size of the compression -/// - is_streamed: if the batch is intended to be streamed or not -/// -/// Returns: the size of the compressed batch considering the header. -fn set_compressed_batch_header( - buff: &mut [u8], - compression_size: usize, - is_streamed: bool, -) -> ZResult { - let final_batch_size: usize; - let payload_size = 1 + compression_size; - if is_streamed { - let payload_size_u16: u16 = payload_size.try_into().map_err(|e| { - zerror!( - "Compression error: unable to convert batch size into u16: {}", - e - ) - })?; - buff[0..HEADER_BYTES_SIZE].copy_from_slice(&payload_size_u16.to_le_bytes()); - buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_ENABLED; - final_batch_size = payload_size + HEADER_BYTES_SIZE; - } else { - buff[COMPRESSION_BYTE_INDEX] = COMPRESSION_ENABLED; - final_batch_size = payload_size; - } - if final_batch_size > MAX_BATCH_SIZE { - // May happen when the payload size is itself the MTU and adding the header exceeds it. - Err(zerror!("Failed to send uncompressed batch, batch size ({}) exceeds the maximum batch size of {}.", final_batch_size, MAX_BATCH_SIZE))? - } - Ok(final_batch_size) -} - -#[cfg(all(feature = "unstable", feature = "transport_compression"))] -/// Inserts the compression byte for batches without compression, that is inserting a 0 byte on the -/// third position of the buffer and increasing the batch size from the header by 1. -/// -/// Arguments: -/// - bytes: the source slice -/// - buff: the output slice -/// - is_streamed: if the batch is meant to be streamed or not, thus considering or not the 2 bytes -/// header specifying the size of the batch. -/// -/// Returns: the size of the batch considering the header. -fn set_uncompressed_batch_header( - bytes: &[u8], - buff: &mut [u8], - is_streamed: bool, -) -> ZResult { - let final_batch_size: usize; - if is_streamed { - let mut header = [0_u8, 0_u8]; - header[..HEADER_BYTES_SIZE].copy_from_slice(&bytes[..HEADER_BYTES_SIZE]); - let batch_size = if let Some(size) = u16::from_le_bytes(header).checked_add(1) { - size - } else { - bail!("Compression error: unable to convert compression size into u16",) - }; - buff[0..HEADER_BYTES_SIZE].copy_from_slice(&batch_size.to_le_bytes()); - buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_DISABLED; - let batch_size: usize = batch_size.into(); - buff[3..batch_size + 2].copy_from_slice(&bytes[2..batch_size + 1]); - final_batch_size = batch_size + 2; - } else { - buff[COMPRESSION_BYTE_INDEX] = COMPRESSION_DISABLED; - let len = 1 + bytes.len(); - buff[1..1 + bytes.len()].copy_from_slice(bytes); - final_batch_size = len; - } - if final_batch_size > MAX_BATCH_SIZE { - // May happen when the payload size is itself the MTU and adding the header exceeds it. - Err(zerror!("Failed to send uncompressed batch, batch size ({}) exceeds the maximum batch size of {}.", final_batch_size, MAX_BATCH_SIZE))?; - } - Ok(final_batch_size) -} - -#[cfg(all(feature = "transport_compression", feature = "unstable"))] -#[test] -fn tx_compression_test() { - const COMPRESSION_BYTE: usize = 1; - let payload = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; - let mut buff: Box<[u8]> = - vec![0; lz4_flex::block::get_maximum_output_size(MAX_BATCH_SIZE) + 3].into_boxed_slice(); - - // Compression done for the sake of comparing the result. - let payload_compression_size = lz4_flex::block::compress_into(&payload, &mut buff).unwrap(); - - fn get_header_value(buff: &[u8]) -> u16 { - let mut header = [0_u8, 0_u8]; - header[..HEADER_BYTES_SIZE].copy_from_slice(&buff[..HEADER_BYTES_SIZE]); - u16::from_le_bytes(header) - } - - // Streamed with compression enabled - let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; - let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap(); - let header = get_header_value(&buff); - assert!(was_compressed); - assert_eq!(header as usize, payload_compression_size + COMPRESSION_BYTE); - assert!(batch_size < batch.len() + COMPRESSION_BYTE); - assert_eq!(batch_size, payload_compression_size + 3); - - // Not streamed with compression enabled - let batch = payload; - let (batch_size, was_compressed) = tx_compressed(true, false, &batch, &mut buff).unwrap(); - assert!(was_compressed); - assert!(batch_size < batch.len() + COMPRESSION_BYTE); - assert_eq!(batch_size, payload_compression_size + COMPRESSION_BYTE); - - // Streamed with compression disabled - let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; - let (batch_size, was_compressed) = tx_compressed(false, true, &batch, &mut buff).unwrap(); - let header = get_header_value(&buff); - assert!(!was_compressed); - assert_eq!(header as usize, payload.len() + COMPRESSION_BYTE); - assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE); - - // Not streamed and compression disabled - let batch = payload; - let (batch_size, was_compressed) = tx_compressed(false, false, &batch, &mut buff).unwrap(); - assert!(!was_compressed); - assert_eq!(batch_size, payload.len() + COMPRESSION_BYTE); - - // Verify that if the compression result is bigger than the original payload size, then the non compressed payload is returned. - let batch = [16, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; // a non compressable payload with no repetitions - let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap(); - assert!(!was_compressed); - assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE); -} - -#[cfg(all(feature = "transport_compression", feature = "unstable"))] -#[test] -fn rx_compression_test() { - let pool = RecyclingObjectPool::new(2, || vec![0_u8; MAX_BATCH_SIZE].into_boxed_slice()); - let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); - - // Compressed batch - let payload: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8]; - let compression_size = lz4_flex::block::compress_into(&payload, &mut buffer[1..]).unwrap(); - buffer[0] = 1; // is compressed byte - - let mut start_pos: usize = 0; - let mut end_pos: usize = 0; - - rx_decompress( - &mut buffer, - &pool, - compression_size + 1, - &mut start_pos, - &mut end_pos, - ) - .unwrap(); - - assert_eq!(start_pos, 0); - assert_eq!(end_pos, payload.len()); - assert_eq!(buffer[start_pos..end_pos], payload); - - // Non compressed batch - let mut start_pos: usize = 0; - let mut end_pos: usize = 0; - - buffer[0] = 0; - buffer[1..payload.len() + 1].copy_from_slice(&payload[..]); - rx_decompress( - &mut buffer, - &pool, - payload.len() + 1, - &mut start_pos, - &mut end_pos, - ) - .unwrap(); - - assert_eq!(start_pos, 1); - assert_eq!(end_pos, payload.len() + 1); - assert_eq!(buffer[start_pos..end_pos], payload); -} diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 5822b09931..3f0dd06cae 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -1,5 +1,3 @@ -use crate::transport_unicast_inner::TransportUnicastTrait; - // // Copyright (c) 2023 ZettaScale Technology // @@ -14,7 +12,10 @@ use crate::transport_unicast_inner::TransportUnicastTrait; // ZettaScale Zenoh Team, // use super::transport::TransportUnicastUniversal; -use crate::common::priority::TransportChannelRx; +use crate::{ + common::priority::TransportChannelRx, + unicast::{link::TransportLinkUnicast, transport_unicast_inner::TransportUnicastTrait}, +}; use async_std::task; use std::sync::MutexGuard; use zenoh_buffers::{ @@ -23,7 +24,6 @@ use zenoh_buffers::{ }; use zenoh_codec::{RCodec, Zenoh080}; use zenoh_core::{zlock, zread}; -use zenoh_link::LinkUnicast; use zenoh_protocol::{ core::{Priority, Reliability}, network::NetworkMessage, @@ -62,7 +62,7 @@ impl TransportUnicastUniversal { } } - fn handle_close(&self, link: &LinkUnicast, _reason: u8, session: bool) -> ZResult<()> { + fn handle_close(&self, link: &TransportLinkUnicast, _reason: u8, session: bool) -> ZResult<()> { // Stop now rx and tx tasks before doing the proper cleanup let _ = self.stop_rx(link); let _ = self.stop_tx(link); @@ -189,7 +189,11 @@ impl TransportUnicastUniversal { Ok(()) } - pub(super) fn read_messages(&self, mut zslice: ZSlice, link: &LinkUnicast) -> ZResult<()> { + pub(super) fn read_messages( + &self, + mut zslice: ZSlice, + link: &TransportLinkUnicast, + ) -> ZResult<()> { let codec = Zenoh080::new(); let mut reader = zslice.reader(); while reader.can_read() { diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 8f8fc3f84b..43e411fc7f 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -16,10 +16,8 @@ use crate::stats::TransportStats; use crate::{ common::priority::{TransportPriorityRx, TransportPriorityTx}, transport_unicast_inner::TransportUnicastTrait, - unicast::{ - universal::link::TransportLinkUnicast, TransportLinkUnicastConfig, - TransportLinkUnicastDirection, - }, + unicast::link::{TransportLinkUnicast, TransportLinkUnicastDirection}, + universal::link::TransportLinkUnicastUniversal, TransportConfigUnicast, TransportExecutor, TransportManager, TransportPeerEventHandler, }; use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; @@ -28,7 +26,7 @@ use std::fmt::DebugStruct; use std::sync::{Arc, RwLock}; use std::time::Duration; use zenoh_core::{zasynclock, zcondfeat, zread, zwrite}; -use zenoh_link::{Link, LinkUnicast}; +use zenoh_link::Link; use zenoh_protocol::network::NetworkMessage; use zenoh_protocol::transport::BatchSize; use zenoh_protocol::{ @@ -69,7 +67,7 @@ pub(crate) struct TransportUnicastUniversal { // Rx priorities pub(super) priority_rx: Arc<[TransportPriorityRx]>, // The links associated to the channel - pub(super) links: Arc>>, + pub(super) links: Arc>>, // The callback pub(super) callback: Arc>>>, // Mutex for notification @@ -167,10 +165,10 @@ impl TransportUnicastUniversal { Ok(()) } - pub(crate) async fn del_link(&self, link: &LinkUnicast) -> ZResult<()> { + pub(crate) async fn del_link(&self, link: &TransportLinkUnicast) -> ZResult<()> { enum Target { Transport, - Link(Box), + Link(Box), } // Try to remove the link @@ -211,7 +209,7 @@ impl TransportUnicastUniversal { } } - pub(crate) fn stop_tx(&self, link: &LinkUnicast) -> ZResult<()> { + pub(crate) fn stop_tx(&self, link: &TransportLinkUnicast) -> ZResult<()> { let mut guard = zwrite!(self.links); match zlinkgetmut!(guard, link) { Some(l) => { @@ -228,7 +226,7 @@ impl TransportUnicastUniversal { } } - pub(crate) fn stop_rx(&self, link: &LinkUnicast) -> ZResult<()> { + pub(crate) fn stop_rx(&self, link: &TransportLinkUnicast) -> ZResult<()> { let mut guard = zwrite!(self.links); match zlinkgetmut!(guard, link) { Some(l) => { @@ -251,15 +249,15 @@ impl TransportUnicastTrait for TransportUnicastUniversal { /*************************************/ /* LINK */ /*************************************/ - async fn add_link(&self, link: LinkUnicast, config: TransportLinkUnicastConfig) -> ZResult<()> { + async fn add_link(&self, link: TransportLinkUnicast) -> ZResult<()> { // Add the link to the channel let mut guard = zwrite!(self.links); // Check if we can add more inbound links - if let TransportLinkUnicastDirection::Inbound = config.direction { + if let TransportLinkUnicastDirection::Inbound = link.config.direction { let count = guard .iter() - .filter(|l| l.config.direction == config.direction) + .filter(|l| l.link.config.direction == link.config.direction) .count(); let limit = zcondfeat!( @@ -283,8 +281,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } } - // Create a channel link from a link - let link = TransportLinkUnicast::new(self.clone(), link, config); + let link = TransportLinkUnicastUniversal::new(self.clone(), link); let mut links = Vec::with_capacity(guard.len() + 1); links.extend_from_slice(&guard); @@ -365,7 +362,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { /*************************************/ /* TERMINATION */ /*************************************/ - async fn close_link(&self, link: &LinkUnicast, reason: u8) -> ZResult<()> { + async fn close_link(&self, link: &TransportLinkUnicast, reason: u8) -> ZResult<()> { log::trace!("Closing link {} with peer: {}", link, self.config.zid); let mut pipeline = zlinkget!(zread!(self.links), link) @@ -411,7 +408,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { self.delete().await } - fn get_links(&self) -> Vec { + fn get_links(&self) -> Vec { zread!(self.links).iter().map(|l| l.link.clone()).collect() } @@ -427,7 +424,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { fn start_tx( &self, - link: &LinkUnicast, + link: &TransportLinkUnicast, executor: &TransportExecutor, keep_alive: Duration, batch_size: BatchSize, @@ -449,7 +446,12 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } } - fn start_rx(&self, link: &LinkUnicast, lease: Duration, batch_size: u16) -> ZResult<()> { + fn start_rx( + &self, + link: &TransportLinkUnicast, + lease: Duration, + batch_size: u16, + ) -> ZResult<()> { let mut guard = zwrite!(self.links); match zlinkgetmut!(guard, link) { Some(l) => { diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index 7dbc5329e6..bf5be7e702 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -34,7 +34,7 @@ impl TransportUnicastUniversal { if let Some(pl) = guard .iter() .filter_map(|tl| { - if msg.is_reliable() == tl.link.is_reliable() { + if msg.is_reliable() == tl.link.link.is_reliable() { tl.pipeline.as_ref() } else { None