Skip to content

Commit

Permalink
WIP WBatch and RBatch [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Sep 29, 2023
1 parent c2bc9bd commit 9ef73bd
Show file tree
Hide file tree
Showing 28 changed files with 921 additions and 400 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion commons/zenoh-buffers/src/bbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use crate::{
reader::HasReader,
vec,
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice,
};
use alloc::boxed::Box;
use alloc::{boxed::Box, sync::Arc};
use core::num::NonZeroUsize;

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -152,6 +153,19 @@ impl<'a> HasReader for &'a BBuf {
}
}

// From impls
impl From<BBuf> for ZSlice {
fn from(value: BBuf) -> Self {
ZSlice {
buf: Arc::new(value.buffer),
start: 0,
end: value.len,
#[cfg(feature = "shared-memory")]
kind: ZSliceKind::Raw,
}
}
}

#[cfg(feature = "test")]
impl BBuf {
pub fn rand(len: usize) -> Self {
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub mod writer {
where
F: FnOnce(&mut [u8]) -> usize;
}

pub trait BacktrackableWriter: Writer {
type Mark;

Expand Down
127 changes: 90 additions & 37 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,61 +37,80 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &InitSyn) -> Self::Output {
let InitSyn {
version,
whatami,
zid,
resolution,
batch_size,
ext_qos,
ext_shm,
ext_auth,
ext_mlink,
ext_lowlatency,
ext_compression,
} = x;

// Header
let mut header = id::INIT;
if x.resolution != Resolution::default() || x.batch_size != batch_size::UNICAST {
if *resolution != Resolution::default() || *batch_size != batch_size::UNICAST {
header |= flag::S;
}
let mut n_exts = (x.ext_qos.is_some() as u8)
+ (x.ext_shm.is_some() as u8)
+ (x.ext_auth.is_some() as u8)
+ (x.ext_mlink.is_some() as u8)
+ (x.ext_lowlatency.is_some() as u8);
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_shm.is_some() as u8)
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, x.version)?;
self.write(&mut *writer, version)?;

let whatami: u8 = match x.whatami {
let whatami: u8 = match whatami {
WhatAmI::Router => 0b00,
WhatAmI::Peer => 0b01,
WhatAmI::Client => 0b10,
};
let flags: u8 = ((x.zid.size() as u8 - 1) << 4) | whatami;
let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
self.write(&mut *writer, flags)?;

let lodec = Zenoh080Length::new(x.zid.size());
lodec.write(&mut *writer, &x.zid)?;
let lodec = Zenoh080Length::new(zid.size());
lodec.write(&mut *writer, zid)?;

if imsg::has_flag(header, flag::S) {
self.write(&mut *writer, x.resolution.as_u8())?;
self.write(&mut *writer, x.batch_size.to_le_bytes())?;
self.write(&mut *writer, resolution.as_u8())?;
self.write(&mut *writer, batch_size.to_le_bytes())?;
}

// Extensions
if let Some(qos) = x.ext_qos.as_ref() {
if let Some(qos) = ext_qos.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(shm) = x.ext_shm.as_ref() {
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (shm, n_exts != 0))?;
}
if let Some(auth) = x.ext_auth.as_ref() {
if let Some(auth) = ext_auth.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (auth, n_exts != 0))?;
}
if let Some(mlink) = x.ext_mlink.as_ref() {
if let Some(mlink) = ext_mlink.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (mlink, n_exts != 0))?;
}
if let Some(lowlatency) = x.ext_lowlatency.as_ref() {
if let Some(lowlatency) = ext_lowlatency.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (lowlatency, n_exts != 0))?;
}
if let Some(compression) = ext_compression.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -150,6 +169,7 @@ where
let mut ext_auth = None;
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -181,6 +201,11 @@ where
ext_lowlatency = Some(q);
has_ext = ext;
}
ext::Compression::ID => {
let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
ext_compression = Some(q);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitSyn", ext)?;
}
Expand All @@ -198,6 +223,7 @@ where
ext_auth,
ext_mlink,
ext_lowlatency,
ext_compression,
})
}
}
Expand All @@ -210,64 +236,84 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &InitAck) -> Self::Output {
let InitAck {
version,
whatami,
zid,
resolution,
batch_size,
cookie,
ext_qos,
ext_shm,
ext_auth,
ext_mlink,
ext_lowlatency,
ext_compression,
} = x;

// Header
let mut header = id::INIT | flag::A;
if x.resolution != Resolution::default() || x.batch_size != batch_size::UNICAST {
if *resolution != Resolution::default() || *batch_size != batch_size::UNICAST {
header |= flag::S;
}
let mut n_exts = (x.ext_qos.is_some() as u8)
+ (x.ext_shm.is_some() as u8)
+ (x.ext_auth.is_some() as u8)
+ (x.ext_mlink.is_some() as u8)
+ (x.ext_lowlatency.is_some() as u8);
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_shm.is_some() as u8)
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, x.version)?;
self.write(&mut *writer, version)?;

let whatami: u8 = match x.whatami {
let whatami: u8 = match whatami {
WhatAmI::Router => 0b00,
WhatAmI::Peer => 0b01,
WhatAmI::Client => 0b10,
};
let flags: u8 = ((x.zid.size() as u8 - 1) << 4) | whatami;
let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
self.write(&mut *writer, flags)?;

let lodec = Zenoh080Length::new(x.zid.size());
lodec.write(&mut *writer, &x.zid)?;
let lodec = Zenoh080Length::new(zid.size());
lodec.write(&mut *writer, zid)?;

if imsg::has_flag(header, flag::S) {
self.write(&mut *writer, x.resolution.as_u8())?;
self.write(&mut *writer, x.batch_size.to_le_bytes())?;
self.write(&mut *writer, resolution.as_u8())?;
self.write(&mut *writer, batch_size.to_le_bytes())?;
}

let zodec = Zenoh080Bounded::<BatchSize>::new();
zodec.write(&mut *writer, &x.cookie)?;
zodec.write(&mut *writer, cookie)?;

// Extensions
if let Some(qos) = x.ext_qos.as_ref() {
if let Some(qos) = ext_qos.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(shm) = x.ext_shm.as_ref() {
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (shm, n_exts != 0))?;
}
if let Some(auth) = x.ext_auth.as_ref() {
if let Some(auth) = ext_auth.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (auth, n_exts != 0))?;
}
if let Some(mlink) = x.ext_mlink.as_ref() {
if let Some(mlink) = ext_mlink.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (mlink, n_exts != 0))?;
}
if let Some(lowlatency) = x.ext_lowlatency.as_ref() {
if let Some(lowlatency) = ext_lowlatency.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (lowlatency, n_exts != 0))?;
}
if let Some(compression) = ext_compression.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -329,6 +375,7 @@ where
let mut ext_auth = None;
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -360,6 +407,11 @@ where
ext_lowlatency = Some(q);
has_ext = ext;
}
ext::Compression::ID => {
let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
ext_compression = Some(q);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitAck", ext)?;
}
Expand All @@ -378,6 +430,7 @@ where
ext_auth,
ext_mlink,
ext_lowlatency,
ext_compression,
})
}
}
Loading

0 comments on commit 9ef73bd

Please sign in to comment.