Skip to content

Commit

Permalink
Merge branch 'protocol_changes' into sample_api_rework
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Mar 31, 2024
2 parents e4501f4 + 312c03a commit 1eede12
Show file tree
Hide file tree
Showing 46 changed files with 227 additions and 112 deletions.
68 changes: 36 additions & 32 deletions commons/zenoh-codec/src/core/zint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,42 @@ use zenoh_buffers::{
writer::{DidntWrite, Writer},
};

const VLE_LEN: usize = 9;
const VLE_LEN_MAX: usize = vle_len(u64::MAX);

const fn vle_len(x: u64) -> usize {
const B1: u64 = u64::MAX << 7;
const B2: u64 = u64::MAX << (7 * 2);
const B3: u64 = u64::MAX << (7 * 3);
const B4: u64 = u64::MAX << (7 * 4);
const B5: u64 = u64::MAX << (7 * 5);
const B6: u64 = u64::MAX << (7 * 6);
const B7: u64 = u64::MAX << (7 * 7);
const B8: u64 = u64::MAX << (7 * 8);

if (x & B1) == 0 {
1
} else if (x & B2) == 0 {
2
} else if (x & B3) == 0 {
3
} else if (x & B4) == 0 {
4
} else if (x & B5) == 0 {
5
} else if (x & B6) == 0 {
6
} else if (x & B7) == 0 {
7
} else if (x & B8) == 0 {
8
} else {
9
}
}

impl LCodec<u64> for Zenoh080 {
fn w_len(self, x: u64) -> usize {
const B1: u64 = u64::MAX << 7;
const B2: u64 = u64::MAX << (7 * 2);
const B3: u64 = u64::MAX << (7 * 3);
const B4: u64 = u64::MAX << (7 * 4);
const B5: u64 = u64::MAX << (7 * 5);
const B6: u64 = u64::MAX << (7 * 6);
const B7: u64 = u64::MAX << (7 * 7);
const B8: u64 = u64::MAX << (7 * 8);

if (x & B1) == 0 {
1
} else if (x & B2) == 0 {
2
} else if (x & B3) == 0 {
3
} else if (x & B4) == 0 {
4
} else if (x & B5) == 0 {
5
} else if (x & B6) == 0 {
6
} else if (x & B7) == 0 {
7
} else if (x & B8) == 0 {
8
} else {
9
}
vle_len(x)
}
}

Expand Down Expand Up @@ -107,7 +111,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, mut x: u64) -> Self::Output {
writer.with_slot(VLE_LEN, move |buffer| {
writer.with_slot(VLE_LEN_MAX, move |buffer| {
let mut len = 0;
while (x & !0x7f_u64) != 0 {
// SAFETY: buffer is guaranteed to be VLE_LEN long where VLE_LEN is
Expand All @@ -122,7 +126,7 @@ where
}
// In case len == VLE_LEN then all the bits have already been written in the latest iteration.
// Else we haven't written all the necessary bytes yet.
if len != VLE_LEN {
if len != VLE_LEN_MAX {
// SAFETY: buffer is guaranteed to be VLE_LEN long where VLE_LEN is
// the maximum number of bytes a VLE can take once encoded.
// I.e.: x is shifted 7 bits to the right every iteration,
Expand Down Expand Up @@ -151,7 +155,7 @@ where
let mut v = 0;
let mut i = 0;
// 7 * VLE_LEN is beyond the maximum number of shift bits
while (b & 0x80_u8) != 0 && i != 7 * (VLE_LEN - 1) {
while (b & 0x80_u8) != 0 && i != 7 * (VLE_LEN_MAX - 1) {
v |= ((b & 0x7f_u8) as u64) << i;
b = reader.read_u8()?;
i += 7;
Expand Down
17 changes: 16 additions & 1 deletion commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ where

fn write(self, writer: &mut W, x: &Declare) -> Self::Output {
let Declare {
interest_id,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -103,6 +104,9 @@ where

// Header
let mut header = id::DECLARE;
if x.interest_id.is_some() {
header |= declare::flag::I;
}
let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
+ (ext_tstamp.is_some() as u8)
+ ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
Expand All @@ -111,6 +115,11 @@ where
}
self.write(&mut *writer, header)?;

// Body
if let Some(interest_id) = interest_id {
self.write(&mut *writer, interest_id)?;
}

// Extensions
if ext_qos != &declare::ext::QoSType::DEFAULT {
n_exts -= 1;
Expand Down Expand Up @@ -157,6 +166,11 @@ where
return Err(DidntRead);
}

let mut interest_id = None;
if imsg::has_flag(self.header, declare::flag::I) {
interest_id = Some(self.codec.read(&mut *reader)?);
}

// Extensions
let mut ext_qos = declare::ext::QoSType::DEFAULT;
let mut ext_tstamp = None;
Expand Down Expand Up @@ -192,10 +206,11 @@ where
let body: DeclareBody = self.codec.read(&mut *reader)?;

Ok(Declare {
body,
interest_id,
ext_qos,
ext_tstamp,
ext_nodeid,
body,
})
}
}
Expand Down
17 changes: 11 additions & 6 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,30 @@ pub use subscriber::*;
pub use token::*;

pub mod flag {
// pub const X: u8 = 1 << 5; // 0x20 Reserved
// pub const X: u8 = 1 << 6; // 0x40 Reserved
pub const I: u8 = 1 << 5; // 0x20 Interest if I==1 then the declare is in a response to an Interest with future==false
// pub const X: u8 = 1 << 6; // 0x40 Reserved
pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
}

/// Flags:
/// - X: Reserved
/// - I: Interest If I==1 then the declare is in a response to an Interest with future==false
/// - X: Reserved
/// - Z: Extension If Z==1 then at least one extension is present
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|X|X| DECLARE |
/// |Z|X|I| DECLARE |
/// +-+-+-+---------+
/// ~interest_id:z32~ if I==1
/// +---------------+
/// ~ [decl_exts] ~ if Z==1
/// +---------------+
/// ~ declaration ~
/// +---------------+
///
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Declare {
pub interest_id: Option<InterestId>,
pub ext_qos: ext::QoSType,
pub ext_tstamp: Option<ext::TimestampType>,
pub ext_nodeid: ext::NodeIdType,
Expand Down Expand Up @@ -132,16 +135,18 @@ impl Declare {

let mut rng = rand::thread_rng();

let body = DeclareBody::rand();
let interest_id = rng.gen_bool(0.5).then_some(rng.gen::<InterestId>());
let ext_qos = ext::QoSType::rand();
let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand);
let ext_nodeid = ext::NodeIdType::rand();
let body = DeclareBody::rand();

Self {
body,
interest_id,
ext_qos,
ext_tstamp,
ext_nodeid,
body,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl InitSyn {
let whatami = WhatAmI::rand();
let zid = ZenohId::default();
let resolution = Resolution::rand();
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down Expand Up @@ -221,7 +221,7 @@ impl InitAck {
} else {
Resolution::rand()
};
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let cookie = ZSlice::rand(64);
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/transport/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Join {
let whatami = WhatAmI::rand();
let zid = ZenohId::default();
let resolution = Resolution::rand();
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let lease = if rng.gen_bool(0.5) {
Duration::from_secs(rng.gen())
} else {
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-protocol/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::network::NetworkMessage;
/// the boundary of the serialized messages. The length is encoded as little-endian.
/// In any case, the length of a message must not exceed 65_535 bytes.
pub type BatchSize = u16;
pub type AtomicBatchSize = core::sync::atomic::AtomicU16;

pub mod batch_size {
use super::BatchSize;
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub use multicast::*;
use serde::Serialize;
pub use unicast::*;
use zenoh_protocol::core::Locator;
use zenoh_protocol::transport::BatchSize;
use zenoh_result::ZResult;

/*************************************/
Expand All @@ -45,7 +46,7 @@ pub struct Link {
pub src: Locator,
pub dst: Locator,
pub group: Option<Locator>,
pub mtu: u16,
pub mtu: BatchSize,
pub is_reliable: bool,
pub is_streamed: bool,
pub interfaces: Vec<String>,
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-link-commons/src/multicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use zenoh_buffers::{reader::HasReader, writer::HasWriter};
use zenoh_codec::{RCodec, WCodec, Zenoh080};
use zenoh_protocol::{
core::{EndPoint, Locator},
transport::TransportMessage,
transport::{BatchSize, TransportMessage},
};
use zenoh_result::{zerror, ZResult};

Expand All @@ -44,7 +44,7 @@ pub struct LinkMulticast(pub Arc<dyn LinkMulticastTrait>);

#[async_trait]
pub trait LinkMulticastTrait: Send + Sync {
fn get_mtu(&self) -> u16;
fn get_mtu(&self) -> BatchSize;
fn get_src(&self) -> &Locator;
fn get_dst(&self) -> &Locator;
fn is_reliable(&self) -> bool;
Expand Down
7 changes: 5 additions & 2 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use core::{
ops::Deref,
};
use std::net::SocketAddr;
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::{
core::{EndPoint, Locator},
transport::BatchSize,
};
use zenoh_result::ZResult;

pub type LinkManagerUnicast = Arc<dyn LinkManagerUnicastTrait>;
Expand All @@ -41,7 +44,7 @@ pub struct LinkUnicast(pub Arc<dyn LinkUnicastTrait>);

#[async_trait]
pub trait LinkUnicastTrait: Send + Sync {
fn get_mtu(&self) -> u16;
fn get_mtu(&self) -> BatchSize;
fn get_src(&self) -> &Locator;
fn get_dst(&self) -> &Locator;
fn is_reliable(&self) -> bool;
Expand Down
13 changes: 8 additions & 5 deletions io/zenoh-links/zenoh-link-quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ use std::net::SocketAddr;
use zenoh_config::Config;
use zenoh_core::zconfigurable;
use zenoh_link_commons::{ConfigurationInspector, LocatorInspector};
use zenoh_protocol::core::{
endpoint::{Address, Parameters},
Locator,
use zenoh_protocol::{
core::{
endpoint::{Address, Parameters},
Locator,
},
transport::BatchSize,
};
use zenoh_result::{bail, zerror, ZResult};

Expand All @@ -47,7 +50,7 @@ pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
// adopted in Zenoh and the usage of 16 bits in Zenoh to encode the
// payload length in byte-streamed, the QUIC MTU is constrained to
// 2^16 - 1 bytes (i.e., 65535).
const QUIC_MAX_MTU: u16 = u16::MAX;
const QUIC_MAX_MTU: BatchSize = BatchSize::MAX;
pub const QUIC_LOCATOR_PREFIX: &str = "quic";

#[derive(Default, Clone, Copy, Debug)]
Expand Down Expand Up @@ -137,7 +140,7 @@ impl ConfigurationInspector<Config> for QuicConfigurator {

zconfigurable! {
// Default MTU (QUIC PDU) in bytes.
static ref QUIC_DEFAULT_MTU: u16 = QUIC_MAX_MTU;
static ref QUIC_DEFAULT_MTU: BatchSize = QUIC_MAX_MTU;
// The LINGER option causes the shutdown() call to block until (1) all application data is delivered
// to the remote end or (2) a timeout expires. The timeout is expressed in seconds.
// More info on the LINGER option and its dynamics can be found at:
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use zenoh_link_commons::{
ListenersUnicastIP, NewLinkChannelSender,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_result::{bail, zerror, ZError, ZResult};

pub struct LinkUnicastQuic {
Expand Down Expand Up @@ -135,7 +136,7 @@ impl LinkUnicastTrait for LinkUnicastQuic {
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
fn get_mtu(&self) -> BatchSize {
*QUIC_DEFAULT_MTU
}

Expand Down
7 changes: 4 additions & 3 deletions io/zenoh-links/zenoh-link-serial/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ pub use unicast::*;
use zenoh_core::zconfigurable;
use zenoh_link_commons::LocatorInspector;
use zenoh_protocol::core::{endpoint::Address, EndPoint, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_result::ZResult;

// Maximum MTU (Serial PDU) in bytes.
const SERIAL_MAX_MTU: u16 = z_serial::MAX_MTU as u16;
const SERIAL_MAX_MTU: BatchSize = z_serial::MAX_MTU as BatchSize;

const DEFAULT_BAUDRATE: u32 = 9_600;

const DEFAULT_EXCLUSIVE: bool = true;

pub const SERIAL_LOCATOR_PREFIX: &str = "serial";

const SERIAL_MTU_LIMIT: u16 = SERIAL_MAX_MTU;
const SERIAL_MTU_LIMIT: BatchSize = SERIAL_MAX_MTU;

zconfigurable! {
// Default MTU (UDP PDU) in bytes.
static ref SERIAL_DEFAULT_MTU: u16 = SERIAL_MTU_LIMIT;
static ref SERIAL_DEFAULT_MTU: BatchSize = SERIAL_MTU_LIMIT;
// Amount of time in microseconds to throttle the accept loop upon an error.
// Default set to 100 ms.
static ref SERIAL_ACCEPT_THROTTLE_TIME: u64 = 100_000;
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use zenoh_link_commons::{
NewLinkChannelSender,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_result::{zerror, ZResult};

use z_serial::ZSerial;
Expand Down Expand Up @@ -177,7 +178,7 @@ impl LinkUnicastTrait for LinkUnicastSerial {
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
fn get_mtu(&self) -> BatchSize {
*SERIAL_DEFAULT_MTU
}

Expand Down
Loading

0 comments on commit 1eede12

Please sign in to comment.