Skip to content

Commit

Permalink
Batch modularization for compression [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Oct 2, 2023
1 parent 9ef73bd commit 1d2caa8
Show file tree
Hide file tree
Showing 19 changed files with 555 additions and 816 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
//! This crate is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
pub(crate) mod batch;
#![no_std]
extern crate alloc;

mod multicast;
mod unicast;

use alloc::{borrow::ToOwned, boxed::Box, string::String};
use async_trait::async_trait;
use core::{cmp::PartialEq, fmt, hash::Hash};
pub use multicast::*;
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-link-commons/src/multicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use alloc::{borrow::Cow, boxed::Box, sync::Arc, vec::Vec};
use async_trait::async_trait;
use core::{
fmt,
hash::{Hash, Hasher},
ops::Deref,
};
use std::{borrow::Cow, sync::Arc};
use zenoh_buffers::{reader::HasReader, writer::HasWriter};
use zenoh_codec::{RCodec, WCodec, Zenoh080};
use zenoh_protocol::{
Expand Down
55 changes: 4 additions & 51 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,15 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use async_trait::async_trait;
use std::{
use core::{
fmt,
hash::{Hash, Hasher},
ops::Deref,
sync::Arc,
};
use zenoh_buffers::{reader::HasReader, ZSlice};
use zenoh_codec::{RCodec, Zenoh080};
use zenoh_protocol::{
core::{EndPoint, Locator},
transport::TransportMessage,
};
use zenoh_result::{zerror, ZResult};

use crate::batch::{Encode, RBatch, WBatch};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::ZResult;

pub type LinkManagerUnicast = Arc<dyn LinkManagerUnicastTrait>;
#[async_trait]
Expand Down Expand Up @@ -59,46 +52,6 @@ pub trait LinkUnicastTrait: Send + Sync {
async fn close(&self) -> ZResult<()>;
}

impl LinkUnicast {
pub async fn send_batch(&self, batch: WBatch) -> ZResult<usize> {
const ERR: &str = "Write error on link: ";
let buff = batch.finalize().map_err(|_| zerror!("{ERR}{self}"))?;
// Send the message on the link
self.0.write_all(buff.as_slice()).await?;
Ok(buff.len())
}

pub async fn send(&self, msg: &TransportMessage) -> ZResult<usize> {
const ERR: &str = "Write error on link: ";
// Create the batch for serializing the message
let mut batch = WBatch::new(self.get_mtu()).set_streamed(self.is_streamed());
batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?;
self.send_batch(batch).await
}

pub async fn recv_batch(&self, mut batch: RBatch) -> ZResult<ZSlice> {
use crate::batch::ReadFrom;
const ERR: &str = "Read error from link: ";
batch.read_from(self).await?;
let zslice = batch.finalize().map_err(|_| zerror!("{ERR}{self}"))?;
Ok(zslice)
}

pub async fn recv(&self) -> ZResult<TransportMessage> {
let batch = RBatch::new(self.get_mtu()).set_streamed(self.is_streamed());
let mut zslice = self.recv_batch(batch).await?;

let mut reader = zslice.reader();
let codec = Zenoh080::new();

let msg: TransportMessage = codec
.read(&mut reader)
.map_err(|_| zerror!("Read error on link: {}", self))?;

Ok(msg)
}
}

impl Deref for LinkUnicast {
type Target = Arc<dyn LinkUnicastTrait>;

Expand Down
3 changes: 1 addition & 2 deletions io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ transport_udp = ["zenoh-link/transport_udp"]
transport_unixsock-stream = ["zenoh-link/transport_unixsock-stream"]
transport_ws = ["zenoh-link/transport_ws"]
transport_serial = ["zenoh-link/transport_serial"]
transport_compression = ["zenoh-link-commons/compression"]
transport_compression = []
transport_unixpipe = ["zenoh-link/transport_unixpipe"]
stats = ["zenoh-protocol/stats"]
test = []
Expand All @@ -68,7 +68,6 @@ zenoh-config = { workspace = true }
zenoh-core = { workspace = true }
zenoh-crypto = { workspace = true }
zenoh-link = { workspace = true }
zenoh-link-commons = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-shm = { workspace = true, optional = true }
Expand Down
Loading

0 comments on commit 1d2caa8

Please sign in to comment.