Skip to content

Commit

Permalink
Merge branch 'sample_api_rework' into explicit_api
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Apr 1, 2024
2 parents 21670cd + 23931f9 commit 71e7ebf
Show file tree
Hide file tree
Showing 64 changed files with 1,140 additions and 858 deletions.
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 36 additions & 32 deletions commons/zenoh-codec/src/core/zint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,42 @@ use zenoh_buffers::{
writer::{DidntWrite, Writer},
};

const VLE_LEN: usize = 9;
const VLE_LEN_MAX: usize = vle_len(u64::MAX);

const fn vle_len(x: u64) -> usize {
const B1: u64 = u64::MAX << 7;
const B2: u64 = u64::MAX << (7 * 2);
const B3: u64 = u64::MAX << (7 * 3);
const B4: u64 = u64::MAX << (7 * 4);
const B5: u64 = u64::MAX << (7 * 5);
const B6: u64 = u64::MAX << (7 * 6);
const B7: u64 = u64::MAX << (7 * 7);
const B8: u64 = u64::MAX << (7 * 8);

if (x & B1) == 0 {
1
} else if (x & B2) == 0 {
2
} else if (x & B3) == 0 {
3
} else if (x & B4) == 0 {
4
} else if (x & B5) == 0 {
5
} else if (x & B6) == 0 {
6
} else if (x & B7) == 0 {
7
} else if (x & B8) == 0 {
8
} else {
9
}
}

impl LCodec<u64> for Zenoh080 {
fn w_len(self, x: u64) -> usize {
const B1: u64 = u64::MAX << 7;
const B2: u64 = u64::MAX << (7 * 2);
const B3: u64 = u64::MAX << (7 * 3);
const B4: u64 = u64::MAX << (7 * 4);
const B5: u64 = u64::MAX << (7 * 5);
const B6: u64 = u64::MAX << (7 * 6);
const B7: u64 = u64::MAX << (7 * 7);
const B8: u64 = u64::MAX << (7 * 8);

if (x & B1) == 0 {
1
} else if (x & B2) == 0 {
2
} else if (x & B3) == 0 {
3
} else if (x & B4) == 0 {
4
} else if (x & B5) == 0 {
5
} else if (x & B6) == 0 {
6
} else if (x & B7) == 0 {
7
} else if (x & B8) == 0 {
8
} else {
9
}
vle_len(x)
}
}

Expand Down Expand Up @@ -107,7 +111,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, mut x: u64) -> Self::Output {
writer.with_slot(VLE_LEN, move |buffer| {
writer.with_slot(VLE_LEN_MAX, move |buffer| {
let mut len = 0;
while (x & !0x7f_u64) != 0 {
// SAFETY: buffer is guaranteed to be VLE_LEN long where VLE_LEN is
Expand All @@ -122,7 +126,7 @@ where
}
// In case len == VLE_LEN then all the bits have already been written in the latest iteration.
// Else we haven't written all the necessary bytes yet.
if len != VLE_LEN {
if len != VLE_LEN_MAX {
// SAFETY: buffer is guaranteed to be VLE_LEN long where VLE_LEN is
// the maximum number of bytes a VLE can take once encoded.
// I.e.: x is shifted 7 bits to the right every iteration,
Expand Down Expand Up @@ -151,7 +155,7 @@ where
let mut v = 0;
let mut i = 0;
// 7 * VLE_LEN is beyond the maximum number of shift bits
while (b & 0x80_u8) != 0 && i != 7 * (VLE_LEN - 1) {
while (b & 0x80_u8) != 0 && i != 7 * (VLE_LEN_MAX - 1) {
v |= ((b & 0x7f_u8) as u64) << i;
b = reader.read_u8()?;
i += 7;
Expand Down
17 changes: 16 additions & 1 deletion commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ where

fn write(self, writer: &mut W, x: &Declare) -> Self::Output {
let Declare {
interest_id,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -103,6 +104,9 @@ where

// Header
let mut header = id::DECLARE;
if x.interest_id.is_some() {
header |= declare::flag::I;
}
let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
+ (ext_tstamp.is_some() as u8)
+ ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
Expand All @@ -111,6 +115,11 @@ where
}
self.write(&mut *writer, header)?;

// Body
if let Some(interest_id) = interest_id {
self.write(&mut *writer, interest_id)?;
}

// Extensions
if ext_qos != &declare::ext::QoSType::DEFAULT {
n_exts -= 1;
Expand Down Expand Up @@ -157,6 +166,11 @@ where
return Err(DidntRead);
}

let mut interest_id = None;
if imsg::has_flag(self.header, declare::flag::I) {
interest_id = Some(self.codec.read(&mut *reader)?);
}

// Extensions
let mut ext_qos = declare::ext::QoSType::DEFAULT;
let mut ext_tstamp = None;
Expand Down Expand Up @@ -192,10 +206,11 @@ where
let body: DeclareBody = self.codec.read(&mut *reader)?;

Ok(Declare {
body,
interest_id,
ext_qos,
ext_tstamp,
ext_nodeid,
body,
})
}
}
Expand Down
17 changes: 11 additions & 6 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,30 @@ pub use subscriber::*;
pub use token::*;

pub mod flag {
// pub const X: u8 = 1 << 5; // 0x20 Reserved
// pub const X: u8 = 1 << 6; // 0x40 Reserved
pub const I: u8 = 1 << 5; // 0x20 Interest if I==1 then the declare is in a response to an Interest with future==false
// pub const X: u8 = 1 << 6; // 0x40 Reserved
pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
}

/// Flags:
/// - X: Reserved
/// - I: Interest If I==1 then the declare is in a response to an Interest with future==false
/// - X: Reserved
/// - Z: Extension If Z==1 then at least one extension is present
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|X|X| DECLARE |
/// |Z|X|I| DECLARE |
/// +-+-+-+---------+
/// ~interest_id:z32~ if I==1
/// +---------------+
/// ~ [decl_exts] ~ if Z==1
/// +---------------+
/// ~ declaration ~
/// +---------------+
///
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Declare {
pub interest_id: Option<InterestId>,
pub ext_qos: ext::QoSType,
pub ext_tstamp: Option<ext::TimestampType>,
pub ext_nodeid: ext::NodeIdType,
Expand Down Expand Up @@ -132,16 +135,18 @@ impl Declare {

let mut rng = rand::thread_rng();

let body = DeclareBody::rand();
let interest_id = rng.gen_bool(0.5).then_some(rng.gen::<InterestId>());
let ext_qos = ext::QoSType::rand();
let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand);
let ext_nodeid = ext::NodeIdType::rand();
let body = DeclareBody::rand();

Self {
body,
interest_id,
ext_qos,
ext_tstamp,
ext_nodeid,
body,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl InitSyn {
let whatami = WhatAmI::rand();
let zid = ZenohId::default();
let resolution = Resolution::rand();
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down Expand Up @@ -221,7 +221,7 @@ impl InitAck {
} else {
Resolution::rand()
};
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let cookie = ZSlice::rand(64);
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/transport/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Join {
let whatami = WhatAmI::rand();
let zid = ZenohId::default();
let resolution = Resolution::rand();
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let lease = if rng.gen_bool(0.5) {
Duration::from_secs(rng.gen())
} else {
Expand Down
Loading

0 comments on commit 71e7ebf

Please sign in to comment.