Skip to content

Commit

Permalink
Use support buffer for compression Tx
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Nov 7, 2023
1 parent 3d5ab84 commit 05117fe
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 23 deletions.
53 changes: 37 additions & 16 deletions io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ impl WBatchStats {
}
}

#[repr(u8)]
#[derive(Debug)]
pub enum Finalize {
Batch,
#[cfg(feature = "transport_compression")]
Buffer,
}

/// Write Batch
///
/// A [`WBatch`][WBatch] is a non-expandable and contiguous region of memory
Expand Down Expand Up @@ -191,30 +199,29 @@ impl WBatch {
zsplit!(self.buffer.as_slice(), self.header)
}

pub fn finalize(&mut self) -> ZResult<()> {
pub fn finalize(
&mut self,
#[cfg(feature = "transport_compression")] buffer: Option<&mut BBuf>,
) -> ZResult<Finalize> {
#[cfg(feature = "transport_compression")]
if self.header.is_compression() {
self.compress()?;
let buffer = buffer.ok_or_else(|| zerror!("Support buffer not provided"))?;
return self.compress(buffer);
}

Ok(())
Ok(Finalize::Batch)
}

#[cfg(feature = "transport_compression")]
fn compress(&mut self) -> ZResult<()> {
let (_header, payload) = self.split();

// Create a new empty buffer
let mut buffer =
BBuf::with_capacity(lz4_flex::block::get_maximum_output_size(self.buffer.len()));

fn compress(&mut self, buffer: &mut BBuf) -> ZResult<Finalize> {
// Write the initial bytes for the batch
let mut writer = buffer.writer();
if let Some(h) = self.header.get() {
let _ = writer.write_u8(h.get());
}

// Compress the actual content
let (_header, payload) = self.split();
writer
.with_slot(writer.remaining(), |b| {
lz4_flex::block::compress_into(payload, b).unwrap_or(0)
Expand All @@ -223,8 +230,7 @@ impl WBatch {

// Verify wether the resulting compressed data is smaller than the initial input
if buffer.len() < self.buffer.len() {
// Replace the buffer in this batch
self.buffer = buffer;
Ok(Finalize::Buffer)
} else {
// Keep the original uncompressed buffer and unset the compression flag from the header
let h = self
Expand All @@ -233,9 +239,8 @@ impl WBatch {
.get_mut(BatchHeader::INDEX)
.ok_or_else(|| zerror!("Header not present"))?;
*h &= !BatchHeader::COMPRESSION;
Ok(Finalize::Batch)
}

Ok(())
}
}

Expand Down Expand Up @@ -406,8 +411,24 @@ mod tests {
let mut wbatch = WBatch::new(config);
wbatch.encode(&msg_in).unwrap();
println!("Encoded WBatch: {:?}", wbatch);
wbatch.finalize().unwrap();
println!("Finalized WBatch: {:?}", wbatch);

#[cfg(feature = "transport_compression")]
let mut buffer = config.is_compression.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(wbatch.as_slice().len()),
));

let res = wbatch
.finalize(
#[cfg(feature = "transport_compression")]
buffer.as_mut(),
)
.unwrap();
let bytes = match res {
Finalize::Batch => wbatch.as_slice(),
#[cfg(feature = "transport_compression")]
Finalize::Buffer => buffer.as_mut().unwrap().as_slice(),
};
println!("Finalized WBatch: {:?}", bytes);

let mut rbatch = RBatch::new(
config,
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl TransportLinkMulticast {

pub async fn send_batch(&self, batch: &mut WBatch) -> ZResult<()> {
const ERR: &str = "Write error on link: ";
batch.finalize().map_err(|_| zerror!("{ERR}{self}"))?;
// @TODO: add support buffer
batch.finalize(None).map_err(|_| zerror!("{ERR}{self}"))?;
// Send the message on the link
self.link.write_all(batch.as_slice()).await?;

Expand Down
36 changes: 30 additions & 6 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::common::batch::{BatchConfig, Decode, Encode, RBatch, WBatch};
use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch};
use std::fmt;
use std::sync::Arc;
use zenoh_buffers::{ZSlice, ZSliceBuffer};
use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer};
use zenoh_link::{Link, LinkUnicast};
use zenoh_protocol::transport::{BatchSize, TransportMessage};
use zenoh_result::{zerror, ZResult};
Expand All @@ -40,11 +40,20 @@ pub(crate) struct TransportLinkUnicastConfig {
pub(crate) struct TransportLinkUnicast {
pub(crate) link: LinkUnicast,
pub(crate) config: TransportLinkUnicastConfig,
#[cfg(feature = "transport_compression")]
pub(crate) buffer: Option<BBuf>,
}

impl TransportLinkUnicast {
pub fn new(link: LinkUnicast, config: TransportLinkUnicastConfig) -> Self {
Self { link, config }
Self {
link,
config,
#[cfg(feature = "transport_compression")]
buffer: config.is_compression.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(config.mtu as usize),
)),
}
}

const fn batch_config(&self) -> BatchConfig {
Expand All @@ -58,14 +67,29 @@ impl TransportLinkUnicast {
pub async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> {
const ERR: &str = "Write error on link: ";

batch.finalize().map_err(|_| zerror!("{ERR}{self}"))?;
let res = batch
.finalize(
#[cfg(feature = "transport_compression")]
self.buffer.as_mut(),
)
.map_err(|_| zerror!("{ERR}{self}"))?;

let bytes = match res {
Finalize::Batch => batch.as_slice(),
#[cfg(feature = "transport_compression")]
Finalize::Buffer => self
.buffer
.as_ref()
.ok_or_else(|| zerror!("Invalid buffer finalization"))?
.as_slice(),
};

// Send the message on the link
if self.link.is_streamed() {
let len = batch.len().to_le_bytes();
let len = bytes.len().to_le_bytes();
self.link.write_all(&len).await?;
}
self.link.write_all(batch.as_slice()).await?;
self.link.write_all(bytes).await?;

Ok(())
}
Expand Down

0 comments on commit 05117fe

Please sign in to comment.