Skip to content

Commit

Permalink
Encoding Id and Schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Mar 5, 2024
1 parent 0ee41f1 commit dec5c49
Show file tree
Hide file tree
Showing 23 changed files with 217 additions and 366 deletions.
12 changes: 6 additions & 6 deletions commons/zenoh-codec/benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -133,7 +133,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -174,7 +174,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -215,7 +215,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -243,7 +243,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -282,7 +282,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
41 changes: 17 additions & 24 deletions commons/zenoh-codec/src/core/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,20 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{LCodec, RCodec, WCodec, Zenoh080, Zenoh080Bounded};
use alloc::string::String;
use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
};
use zenoh_protocol::{
common::imsg,
core::encoding::{flag, Encoding, EncodingPrefix},
core::encoding::{flag, Encoding, EncodingId},
};

impl LCodec<&Encoding> for Zenoh080 {
fn w_len(self, x: &Encoding) -> usize {
let (prefix, suffix) = (x.prefix(), x.suffix());
let mut len = self.w_len((prefix as u32) << 1);
if !suffix.is_empty() {
len += self.w_len(x.suffix());
let mut len = self.w_len((x.id as u32) << 1);
if let Some(schema) = x.schema.as_ref() {
len += schema.len();
}
len
}
Expand All @@ -40,17 +38,16 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &Encoding) -> Self::Output {
let mut prefix = (x.prefix() as u32) << 1;
let suffix = x.suffix();
let mut id = (x.id as u32) << 1;

if !suffix.is_empty() {
prefix |= flag::S;
if x.schema.is_some() {
id |= flag::S;
}
let zodec = Zenoh080Bounded::<u32>::new();
zodec.write(&mut *writer, prefix)?;
if !suffix.is_empty() {
zodec.write(&mut *writer, id)?;
if let Some(schema) = x.schema.as_ref() {
let zodec = Zenoh080Bounded::<u8>::new();
zodec.write(&mut *writer, suffix)?;
zodec.write(&mut *writer, schema)?;
}
Ok(())
}
Expand All @@ -64,23 +61,19 @@ where

fn read(self, reader: &mut R) -> Result<Encoding, Self::Error> {
let zodec = Zenoh080Bounded::<u32>::new();
let prefix: u32 = zodec.read(&mut *reader)?;
let (prefix, has_suffix) = (
(prefix >> 1) as EncodingPrefix,
imsg::has_flag(prefix as u8, flag::S as u8),
let id: u32 = zodec.read(&mut *reader)?;
let (id, has_suffix) = (
(id >> 1) as EncodingId,
imsg::has_flag(id as u8, flag::S as u8),
);

let mut suffix = String::new();
let mut schema = None;
if has_suffix {
let zodec = Zenoh080Bounded::<u8>::new();
suffix = zodec.read(&mut *reader)?;
}

let mut encoding: Encoding = Encoding::new(prefix);
if !suffix.is_empty() {
encoding = encoding.with_suffix(suffix).map_err(|_| DidntRead)?;
schema = Some(zodec.read(&mut *reader)?);
}

let encoding = Encoding { id, schema };
Ok(encoding)
}
}
6 changes: 3 additions & 3 deletions commons/zenoh-codec/src/zenoh/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
if timestamp.is_some() {
header |= flag::T;
}
if encoding != &Encoding::UNSPECIFIED {
if encoding != &Encoding::empty() {
header |= flag::E;
}
let mut n_exts = (ext_sinfo.is_some()) as u8
Expand All @@ -73,7 +73,7 @@ where
if let Some(ts) = timestamp.as_ref() {
self.write(&mut *writer, ts)?;
}
if encoding != &Encoding::UNSPECIFIED {
if encoding != &Encoding::empty() {
self.write(&mut *writer, encoding)?;
}

Expand Down Expand Up @@ -143,7 +143,7 @@ where
timestamp = Some(self.codec.read(&mut *reader)?);
}

let mut encoding = Encoding::UNSPECIFIED;
let mut encoding = Encoding::empty();
if imsg::has_flag(self.header, flag::E) {
encoding = self.codec.read(&mut *reader)?;
}
Expand Down
77 changes: 16 additions & 61 deletions commons/zenoh-protocol/src/core/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::core::CowStr;
use alloc::borrow::Cow;
use core::fmt::Debug;
use zenoh_result::{bail, ZResult};
use zenoh_buffers::ZSlice;

pub type EncodingPrefix = u16;
pub type EncodingId = u16;

/// [`Encoding`] is a metadata that indicates how the data payload should be interpreted.
/// For wire-efficiency and extensibility purposes, Zenoh defines an [`Encoding`] as
Expand All @@ -27,68 +25,31 @@ pub type EncodingPrefix = u16;
/// of the API as per user convenience. That mapping has no impact on the Zenoh protocol definition.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Encoding {
prefix: EncodingPrefix,
suffix: CowStr<'static>,
pub id: EncodingId,
pub schema: Option<ZSlice>,
}

/// # Encoding field
///
/// ```text
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// ~ prefix: z16 |S~
/// ~ id: z16 |S~
/// +---------------+
/// ~suffix: <u8;z8>~ -- if S==1
/// ~schema: <u8;z8>~ -- if S==1
/// +---------------+
/// ```
pub mod flag {
pub const S: u32 = 1; // 0x01 Suffix if S==1 then suffix is present
}

impl Encoding {
pub const UNSPECIFIED: Self = Self::empty();

/// Returns a new [`Encoding`] object provided the prefix ID.
pub const fn new(prefix: EncodingPrefix) -> Self {
Self {
prefix,
suffix: CowStr::borrowed(""),
}
}

/// Sets the suffix of the encoding.
/// It will return an error when the suffix is longer than 255 characters.
pub fn with_suffix<IntoCowStr>(mut self, suffix: IntoCowStr) -> ZResult<Self>
where
IntoCowStr: Into<Cow<'static, str>> + AsRef<str>,
{
let s: Cow<'static, str> = suffix.into();
if s.as_bytes().len() > u8::MAX as usize {
bail!("Suffix length is limited to 255 characters")
}
self.suffix = (self.suffix + s.as_ref()).into();
Ok(self)
}

/// Returns a new [`Encoding`] object with default empty prefix ID.
pub const fn empty() -> Self {
Self::new(0)
}

// Returns the numerical prefix
pub const fn prefix(&self) -> EncodingPrefix {
self.prefix
}

// Returns the suffix string
pub fn suffix(&self) -> &str {
self.suffix.as_str()
}

/// Returns `true` if the string representation of this encoding starts with
/// the string representation of the other given encoding.
pub fn starts_with(&self, with: &Encoding) -> bool {
self.prefix() == with.prefix() && self.suffix().starts_with(with.suffix())
Self {
id: 0,
schema: None,
}
}
}

Expand All @@ -101,23 +62,17 @@ impl Default for Encoding {
impl Encoding {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::{
distributions::{Alphanumeric, DistString},
Rng,
};
use rand::Rng;

const MIN: usize = 2;
const MAX: usize = 16;

let mut rng = rand::thread_rng();

let prefix: EncodingPrefix = rng.gen();
let suffix: String = if rng.gen_bool(0.5) {
let len = rng.gen_range(MIN..MAX);
Alphanumeric.sample_string(&mut rng, len)
} else {
String::new()
};
Encoding::new(prefix).with_suffix(suffix).unwrap()
let id: EncodingId = rng.gen();
let schema = rng
.gen_bool(0.5)
.then_some(ZSlice::rand(rng.gen_range(MIN..MAX)));
Encoding { id, schema }
}
}
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub use wire_expr::*;
mod cowstr;
pub use cowstr::CowStr;
pub mod encoding;
pub use encoding::{Encoding, EncodingPrefix};
pub use encoding::{Encoding, EncodingId};

pub mod locator;
pub use locator::*;
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
6 changes: 3 additions & 3 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ mod tests {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -884,7 +884,7 @@ mod tests {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -996,7 +996,7 @@ mod tests {
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/tests/multicast_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ mod tests {
payload: Put {
payload: vec![0u8; msg_size].into(),
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/tests/multicast_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ mod tests {
payload: Put {
payload: vec![0u8; msg_size].into(),
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/tests/unicast_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ mod tests {
payload: Put {
payload: vec![0u8; msg_size].into(),
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/tests/unicast_concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async fn transport_concurrent(endpoint01: Vec<EndPoint>, endpoint02: Vec<EndPoin
payload: Put {
payload: vec![0u8; MSG_SIZE].into(),
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -305,7 +305,7 @@ async fn transport_concurrent(endpoint01: Vec<EndPoint>, endpoint02: Vec<EndPoin
payload: Put {
payload: vec![0u8; MSG_SIZE].into(),
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/tests/unicast_defragmentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn run(endpoint: &EndPoint, channel: Channel, msg_size: usize) {
payload: Put {
payload: vec![0u8; msg_size].into(),
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/tests/unicast_intermittent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ async fn transport_intermittent(endpoint: &EndPoint, lowlatency_transport: bool)
payload: Put {
payload: vec![0u8; MSG_SIZE].into(),
timestamp: None,
encoding: Encoding::UNSPECIFIED,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down
Loading

0 comments on commit dec5c49

Please sign in to comment.