From dec5c4971e9b2ba31e387b096781d8e5b582fcd4 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 5 Mar 2024 18:13:08 +0100 Subject: [PATCH] Encoding Id and Schema --- commons/zenoh-codec/benches/codec.rs | 12 +- commons/zenoh-codec/src/core/encoding.rs | 41 +- commons/zenoh-codec/src/zenoh/put.rs | 6 +- commons/zenoh-protocol/src/core/encoding.rs | 77 +--- commons/zenoh-protocol/src/core/mod.rs | 2 +- io/zenoh-transport/src/common/batch.rs | 2 +- io/zenoh-transport/src/common/pipeline.rs | 6 +- .../tests/multicast_compression.rs | 2 +- .../tests/multicast_transport.rs | 2 +- .../tests/unicast_compression.rs | 2 +- .../tests/unicast_concurrent.rs | 4 +- .../tests/unicast_defragmentation.rs | 2 +- .../tests/unicast_intermittent.rs | 2 +- .../tests/unicast_priorities.rs | 2 +- io/zenoh-transport/tests/unicast_shm.rs | 4 +- .../tests/unicast_simultaneous.rs | 2 +- io/zenoh-transport/tests/unicast_transport.rs | 2 +- plugins/zenoh-plugin-rest/src/lib.rs | 34 +- .../src/replica/storage.rs | 3 +- zenoh/src/net/routing/dispatcher/queries.rs | 8 +- zenoh/src/net/tests/tables.rs | 10 +- zenoh/src/prelude.rs | 2 +- zenoh/src/value.rs | 356 +++++++----------- 23 files changed, 217 insertions(+), 366 deletions(-) diff --git a/commons/zenoh-codec/benches/codec.rs b/commons/zenoh-codec/benches/codec.rs index 11381ea9c3..d897038f91 100644 --- a/commons/zenoh-codec/benches/codec.rs +++ b/commons/zenoh-codec/benches/codec.rs @@ -87,7 +87,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -133,7 +133,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -174,7 +174,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -215,7 +215,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -243,7 +243,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -282,7 +282,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/commons/zenoh-codec/src/core/encoding.rs b/commons/zenoh-codec/src/core/encoding.rs index b14710f48f..ebc3e58754 100644 --- a/commons/zenoh-codec/src/core/encoding.rs +++ b/commons/zenoh-codec/src/core/encoding.rs @@ -12,22 +12,20 @@ // ZettaScale Zenoh Team, // use crate::{LCodec, RCodec, WCodec, Zenoh080, Zenoh080Bounded}; -use alloc::string::String; use zenoh_buffers::{ reader::{DidntRead, Reader}, writer::{DidntWrite, Writer}, }; use zenoh_protocol::{ common::imsg, - core::encoding::{flag, Encoding, EncodingPrefix}, + core::encoding::{flag, Encoding, EncodingId}, }; impl LCodec<&Encoding> for Zenoh080 { fn w_len(self, x: &Encoding) -> usize { - let (prefix, suffix) = (x.prefix(), x.suffix()); - let mut len = self.w_len((prefix as u32) << 1); - if !suffix.is_empty() { - len += self.w_len(x.suffix()); + let mut len = self.w_len((x.id as u32) << 1); + if let Some(schema) = x.schema.as_ref() { + len += schema.len(); } len } @@ -40,17 +38,16 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Encoding) -> Self::Output { - let mut prefix = (x.prefix() as u32) << 1; - let suffix = x.suffix(); + let mut id = (x.id as u32) << 1; - if !suffix.is_empty() { - prefix |= flag::S; + if x.schema.is_some() { + id |= flag::S; } let zodec = Zenoh080Bounded::::new(); - zodec.write(&mut *writer, prefix)?; - if !suffix.is_empty() { + zodec.write(&mut *writer, id)?; + if let Some(schema) = x.schema.as_ref() { let zodec = Zenoh080Bounded::::new(); - zodec.write(&mut *writer, suffix)?; + zodec.write(&mut *writer, schema)?; } Ok(()) } @@ -64,23 +61,19 @@ where fn read(self, reader: &mut R) -> Result { let zodec = Zenoh080Bounded::::new(); - let prefix: u32 = zodec.read(&mut *reader)?; - let (prefix, has_suffix) = ( - (prefix >> 1) as EncodingPrefix, - imsg::has_flag(prefix as u8, flag::S as u8), + let id: u32 = zodec.read(&mut *reader)?; + let (id, has_suffix) = ( + (id >> 1) as EncodingId, + imsg::has_flag(id as u8, flag::S as u8), ); - let mut suffix = String::new(); + let mut schema = None; if has_suffix { let zodec = Zenoh080Bounded::::new(); - suffix = zodec.read(&mut *reader)?; - } - - let mut encoding: Encoding = Encoding::new(prefix); - if !suffix.is_empty() { - encoding = encoding.with_suffix(suffix).map_err(|_| DidntRead)?; + schema = Some(zodec.read(&mut *reader)?); } + let encoding = Encoding { id, schema }; Ok(encoding) } } diff --git a/commons/zenoh-codec/src/zenoh/put.rs b/commons/zenoh-codec/src/zenoh/put.rs index e97fb5957a..776b47245f 100644 --- a/commons/zenoh-codec/src/zenoh/put.rs +++ b/commons/zenoh-codec/src/zenoh/put.rs @@ -54,7 +54,7 @@ where if timestamp.is_some() { header |= flag::T; } - if encoding != &Encoding::UNSPECIFIED { + if encoding != &Encoding::empty() { header |= flag::E; } let mut n_exts = (ext_sinfo.is_some()) as u8 @@ -73,7 +73,7 @@ where if let Some(ts) = timestamp.as_ref() { self.write(&mut *writer, ts)?; } - if encoding != &Encoding::UNSPECIFIED { + if encoding != &Encoding::empty() { self.write(&mut *writer, encoding)?; } @@ -143,7 +143,7 @@ where timestamp = Some(self.codec.read(&mut *reader)?); } - let mut encoding = Encoding::UNSPECIFIED; + let mut encoding = Encoding::empty(); if imsg::has_flag(self.header, flag::E) { encoding = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-protocol/src/core/encoding.rs b/commons/zenoh-protocol/src/core/encoding.rs index abe3d30178..9b9aa5bf2f 100644 --- a/commons/zenoh-protocol/src/core/encoding.rs +++ b/commons/zenoh-protocol/src/core/encoding.rs @@ -11,12 +11,10 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::core::CowStr; -use alloc::borrow::Cow; use core::fmt::Debug; -use zenoh_result::{bail, ZResult}; +use zenoh_buffers::ZSlice; -pub type EncodingPrefix = u16; +pub type EncodingId = u16; /// [`Encoding`] is a metadata that indicates how the data payload should be interpreted. /// For wire-efficiency and extensibility purposes, Zenoh defines an [`Encoding`] as @@ -27,8 +25,8 @@ pub type EncodingPrefix = u16; /// of the API as per user convenience. That mapping has no impact on the Zenoh protocol definition. #[derive(Clone, Debug, PartialEq, Eq)] pub struct Encoding { - prefix: EncodingPrefix, - suffix: CowStr<'static>, + pub id: EncodingId, + pub schema: Option, } /// # Encoding field @@ -36,9 +34,9 @@ pub struct Encoding { /// ```text /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ -/// ~ prefix: z16 |S~ +/// ~ id: z16 |S~ /// +---------------+ -/// ~suffix: ~ -- if S==1 +/// ~schema: ~ -- if S==1 /// +---------------+ /// ``` pub mod flag { @@ -46,49 +44,12 @@ pub mod flag { } impl Encoding { - pub const UNSPECIFIED: Self = Self::empty(); - - /// Returns a new [`Encoding`] object provided the prefix ID. - pub const fn new(prefix: EncodingPrefix) -> Self { - Self { - prefix, - suffix: CowStr::borrowed(""), - } - } - - /// Sets the suffix of the encoding. - /// It will return an error when the suffix is longer than 255 characters. - pub fn with_suffix(mut self, suffix: IntoCowStr) -> ZResult - where - IntoCowStr: Into> + AsRef, - { - let s: Cow<'static, str> = suffix.into(); - if s.as_bytes().len() > u8::MAX as usize { - bail!("Suffix length is limited to 255 characters") - } - self.suffix = (self.suffix + s.as_ref()).into(); - Ok(self) - } - /// Returns a new [`Encoding`] object with default empty prefix ID. pub const fn empty() -> Self { - Self::new(0) - } - - // Returns the numerical prefix - pub const fn prefix(&self) -> EncodingPrefix { - self.prefix - } - - // Returns the suffix string - pub fn suffix(&self) -> &str { - self.suffix.as_str() - } - - /// Returns `true` if the string representation of this encoding starts with - /// the string representation of the other given encoding. - pub fn starts_with(&self, with: &Encoding) -> bool { - self.prefix() == with.prefix() && self.suffix().starts_with(with.suffix()) + Self { + id: 0, + schema: None, + } } } @@ -101,23 +62,17 @@ impl Default for Encoding { impl Encoding { #[cfg(feature = "test")] pub fn rand() -> Self { - use rand::{ - distributions::{Alphanumeric, DistString}, - Rng, - }; + use rand::Rng; const MIN: usize = 2; const MAX: usize = 16; let mut rng = rand::thread_rng(); - let prefix: EncodingPrefix = rng.gen(); - let suffix: String = if rng.gen_bool(0.5) { - let len = rng.gen_range(MIN..MAX); - Alphanumeric.sample_string(&mut rng, len) - } else { - String::new() - }; - Encoding::new(prefix).with_suffix(suffix).unwrap() + let id: EncodingId = rng.gen(); + let schema = rng + .gen_bool(0.5) + .then_some(ZSlice::rand(rng.gen_range(MIN..MAX))); + Encoding { id, schema } } } diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index 1f21af5f27..82658db2fd 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -42,7 +42,7 @@ pub use wire_expr::*; mod cowstr; pub use cowstr::CowStr; pub mod encoding; -pub use encoding::{Encoding, EncodingPrefix}; +pub use encoding::{Encoding, EncodingId}; pub mod locator; pub use locator::*; diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 97c066639f..e923a7e1af 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -579,7 +579,7 @@ mod tests { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 314ab6b24c..3968eabdf5 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -754,7 +754,7 @@ mod tests { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -884,7 +884,7 @@ mod tests { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -996,7 +996,7 @@ mod tests { ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/tests/multicast_compression.rs b/io/zenoh-transport/tests/multicast_compression.rs index 8f38df946a..5301b967f6 100644 --- a/io/zenoh-transport/tests/multicast_compression.rs +++ b/io/zenoh-transport/tests/multicast_compression.rs @@ -273,7 +273,7 @@ mod tests { payload: Put { payload: vec![0u8; msg_size].into(), timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/tests/multicast_transport.rs b/io/zenoh-transport/tests/multicast_transport.rs index ea203809d7..69c1decd83 100644 --- a/io/zenoh-transport/tests/multicast_transport.rs +++ b/io/zenoh-transport/tests/multicast_transport.rs @@ -269,7 +269,7 @@ mod tests { payload: Put { payload: vec![0u8; msg_size].into(), timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/tests/unicast_compression.rs b/io/zenoh-transport/tests/unicast_compression.rs index 8f4ae89d8e..a9c10e1a9e 100644 --- a/io/zenoh-transport/tests/unicast_compression.rs +++ b/io/zenoh-transport/tests/unicast_compression.rs @@ -301,7 +301,7 @@ mod tests { payload: Put { payload: vec![0u8; msg_size].into(), timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/tests/unicast_concurrent.rs b/io/zenoh-transport/tests/unicast_concurrent.rs index 0673907905..b14cebaaf9 100644 --- a/io/zenoh-transport/tests/unicast_concurrent.rs +++ b/io/zenoh-transport/tests/unicast_concurrent.rs @@ -200,7 +200,7 @@ async fn transport_concurrent(endpoint01: Vec, endpoint02: Vec, endpoint02: Vec, client_transport: TransportUn payload: Put { payload: vec![0u8; *ms].into(), timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index b493b99c11..5ec7e31aba 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -277,7 +277,7 @@ mod tests { payload: Put { payload: sbuf.into(), timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, ext_shm: None, ext_attachment: None, @@ -325,7 +325,7 @@ mod tests { payload: Put { payload: sbuf.into(), timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, ext_shm: None, ext_attachment: None, diff --git a/io/zenoh-transport/tests/unicast_simultaneous.rs b/io/zenoh-transport/tests/unicast_simultaneous.rs index 96751bd7d2..d465497556 100644 --- a/io/zenoh-transport/tests/unicast_simultaneous.rs +++ b/io/zenoh-transport/tests/unicast_simultaneous.rs @@ -82,7 +82,7 @@ mod tests { payload: Put { payload: vec![0u8; MSG_SIZE].into(), timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 90c2fe008a..2a830a9e2b 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -472,7 +472,7 @@ async fn test_transport( payload: Put { payload: vec![0u8; msg_size].into(), timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 4f1203a4df..fdd3eda098 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -383,19 +383,10 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result match DefaultEncoding.parse(m.to_string()) { - Ok(e) => e, - Err(e) => { - return Ok(response( - StatusCode::BadRequest, - "text/plain", - &e.to_string(), - )); - } - }, - None => Encoding::default(), - }; + let encoding: Encoding = req + .content_type() + .map(|m| DefaultEncoding.parse(m.to_string())) + .unwrap_or_default(); query = query.with_value(Value::from(body).with_encoding(encoding)); } match query.res().await { @@ -432,19 +423,10 @@ async fn write(mut req: Request<(Arc, String)>) -> tide::Result match DefaultEncoding.parse(m.to_string()) { - Ok(e) => e, - Err(e) => { - return Ok(response( - StatusCode::BadRequest, - "text/plain", - &e.to_string(), - )); - } - }, - None => Encoding::default(), - }; + let encoding: Encoding = req + .content_type() + .map(|m| DefaultEncoding.parse(m.to_string())) + .unwrap_or_default(); // @TODO: Define the right congestion control value let session = &req.state().0; diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 43d36c2a3b..b052f29fd5 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -707,8 +707,7 @@ fn construct_update(data: String) -> Update { for slice in result.3 { payload.push_zslice(slice.to_vec().into()); } - let value = - Value::new(payload).with_encoding(DefaultEncoding.parse(result.2.to_string()).unwrap()); // @TODO: remove the unwrap() + let value = Value::new(payload).with_encoding(DefaultEncoding.parse(result.2)); let data = StoredData { value, timestamp: Timestamp::from_str(&result.1).unwrap(), // @TODO: remove the unwrap() diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 93340241f1..b0f7f7f7ef 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -558,10 +558,10 @@ pub fn route_query( ext_unknown: vec![], // @TODO: handle unknown extensions payload: ReplyBody::Put(Put { // @TODO: handle Del case - timestamp: None, // @TODO: handle timestamp - encoding: Encoding::UNSPECIFIED, // @TODO: handle encoding - ext_sinfo: None, // @TODO: handle source info - ext_attachment: None, // @TODO: expose it in the API + timestamp: None, // @TODO: handle timestamp + encoding: Encoding::empty(), // @TODO: handle encoding + ext_sinfo: None, // @TODO: handle source info + ext_attachment: None, // @TODO: expose it in the API #[cfg(feature = "shared-memory")] ext_shm: None, ext_unknown: vec![], // @TODO: handle unknown extensions diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 04b4725858..8f3c494e6a 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -624,7 +624,7 @@ fn client_test() { ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -657,7 +657,7 @@ fn client_test() { ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -690,7 +690,7 @@ fn client_test() { ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -723,7 +723,7 @@ fn client_test() { ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -756,7 +756,7 @@ fn client_test() { ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::UNSPECIFIED, + encoding: Encoding::empty(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index e9dd105c75..f6da568399 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -31,7 +31,7 @@ pub(crate) mod common { writer::HasWriter, }; pub use zenoh_core::Resolve; - pub use zenoh_protocol::core::{Encoding, EncodingPrefix, EndPoint, Locator, ZenohId}; + pub use zenoh_protocol::core::{Encoding, EncodingId, EndPoint, Locator, ZenohId}; pub use crate::config::{self, Config, ValidatedMap}; pub use crate::handlers::IntoCallbackReceiverPair; diff --git a/zenoh/src/value.rs b/zenoh/src/value.rs index 0ee274692b..25a0ad092e 100644 --- a/zenoh/src/value.rs +++ b/zenoh/src/value.rs @@ -14,9 +14,10 @@ //! Value primitives. use crate::{payload::Payload, prelude::Encoding}; +use phf::phf_map; use std::borrow::Cow; -use zenoh_protocol::core::EncodingPrefix; -use zenoh_result::ZResult; +use zenoh_buffers::ZSlice; +use zenoh_protocol::core::EncodingId; /// A zenoh [`Value`] contains a `payload` and an [`Encoding`] that indicates how the `payload` /// should be interpreted. @@ -86,270 +87,191 @@ where #[derive(Clone, Copy, Debug)] pub struct DefaultEncoding; -pub mod prefix { - use phf::phf_ordered_map; - use zenoh_protocol::core::EncodingPrefix; - - // - Primitives types supported in all Zenoh bindings. - - /// Unspecified [`EncodingPrefix`]. - /// Note that an [`Encoding`] could have an empty [prefix](`Encoding::prefix`) and a non-empty [suffix](`Encoding::suffix`). - pub const EMPTY: EncodingPrefix = 0; - /// A stream of bytes. - pub const APPLICATION_OCTET_STREAM: EncodingPrefix = 1; - /// A signed integer. - pub const ZENOH_INT: EncodingPrefix = 2; - /// An unsigned integer. - pub const ZENOH_UINT: EncodingPrefix = 3; - /// A float. - pub const ZENOH_FLOAT: EncodingPrefix = 4; - /// A boolean. - pub const ZENOH_BOOL: EncodingPrefix = 5; - /// A string. - pub const TEXT_PLAIN: EncodingPrefix = 6; - - // - Advanced types supported in some Zenoh bindings. +impl DefaultEncoding { + // - Primitives types supported in all Zenoh bindings + /// See [`DefaultEncodingMapping::ZENOH_BYTES`]. + pub const ZENOH_BYTES: Encoding = Encoding { + id: 0, + schema: None, + }; + /// A VLE-encoded signed little-endian integer. Either 8bit, 16bit, 32bit, or 64bit. + /// Binary reprensentation uses two's complement. + pub const ZENOH_INT: Encoding = Encoding { + id: 1, + schema: None, + }; + /// A VLE-encoded little-endian unsigned integer. Either 8bit, 16bit, 32bit, or 64bit. + pub const ZENOH_UINT: Encoding = Encoding { + id: 2, + schema: None, + }; + /// A VLE-encoded float. Either little-endian 32bit or 64bit. + /// Binary representation uses *IEEE 754-2008* *binary32* or *binary64*, respectively. + pub const ZENOH_FLOAT: Encoding = Encoding { + id: 3, + schema: None, + }; + /// A boolean. `0` is `false`, `1` is `true`. Other values are invalid. + pub const ZENOH_BOOL: Encoding = Encoding { + id: 4, + schema: None, + }; + /// A UTF-8 string. + pub const ZENOH_STRING: Encoding = Encoding { + id: 5, + schema: None, + }; + /// A zenoh error. + pub const ZENOH_ERRROR: Encoding = Encoding { + id: 6, + schema: None, + }; + // - Advanced types. The may be supported in some Zenoh bindings. + /// An application-specific stream of bytes. + pub const APPLICATION_OCTET_STREAM: Encoding = Encoding { + id: 7, + schema: None, + }; + /// A textual file. + pub const TEXT_PLAIN: Encoding = Encoding { + id: 8, + schema: None, + }; /// A JSON intended to be consumed by an application. - pub const APPLICATION_JSON: EncodingPrefix = 7; + pub const APPLICATION_JSON: Encoding = Encoding { + id: 9, + schema: None, + }; /// A JSON intended to be human readable. - pub const TEXT_JSON: EncodingPrefix = 8; - - // - 9-15 are reserved - - // - List of known mapping. Encoding capabilities may not be provided at all. - + pub const TEXT_JSON: Encoding = Encoding { + id: 10, + schema: None, + }; /// A Common Data Representation (CDR)-encoded data. A [suffix](`Encoding::suffix`) may be provided in the [`Encoding`] to specify the concrete type. - pub const APPLICATION_CDR: EncodingPrefix = 16; - - // - 17-63 are reserved - // - The highest prefix value to fit in 1 byte on the wire is 63. - // - 64-1014 are reserved. - - // - A list of known prefixes. Encoding capabilities may not be provided at all. - - /// Common prefix for Zenoh-defined types. - pub const ZENOH: EncodingPrefix = 1_014; - - // - A list of IANA registries. - - /// Common prefix for *application* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#application). - pub const APPLICATION: EncodingPrefix = 1_015; - /// Common prefix for *audio* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#audio). - pub const AUDIO: EncodingPrefix = 1_016; - /// Common prefix for *font* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#font). - pub const FONT: EncodingPrefix = 1_017; - /// Common prefix for *image* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#image). - pub const IMAGE: EncodingPrefix = 1_018; - /// Common prefix for *message* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#message). - pub const MESSAGE: EncodingPrefix = 1_019; - /// Common prefix for *model* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#model). - pub const MODEL: EncodingPrefix = 1_020; - /// Common prefix for *multipart* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#multipart). - pub const MULTIPART: EncodingPrefix = 1_021; - /// Common prefix for *text* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#text). - pub const TEXT: EncodingPrefix = 1_022; - /// Common prefix for *video* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#video). - pub const VIDEO: EncodingPrefix = 1_023; - - // - 1024-65535 are free to use. - - // - End encoding prefix definition + pub const APPLICATION_CDR: Encoding = Encoding { + id: 11, + schema: None, + }; - /// A perfect hashmap for fast lookup of [`EncodingPrefix`] to string represenation. - pub(super) const KNOWN_PREFIX: phf::OrderedMap = phf_ordered_map! { - 0u16 => "", + /// A perfect hashmap for fast lookup of [`EncodingId`] to string represenation. + const ID_TO_STR: phf::Map = phf_map! { // - Primitive types - 1u16 => "application/octet-stream", - 2u16 => "zenoh/int", - 3u16 => "zenoh/uint", - 4u16 => "zenoh/float", - 5u16 => "zenoh/bool", - 6u16 => "text/plain", + 0u16 => "zenoh/bytes", + 1u16 => "zenoh/int", + 2u16 => "zenoh/uint", + 3u16 => "zenoh/float", + 4u16 => "zenoh/bool", + 5u16 => "zenoh/string", + 6u16 => "zenoh/error", // - Advanced types - 7u16 => "application/json", - 8u16 => "text/json", - // - 9-15 are reserved. - 16u16 => "application/cdr", - // - 17-1019 are reserved. - 1_014u16 => "zenoh/", - 1_015u16 => "application/", - 1_016u16 => "audio/", - 1_017u16 => "font/", - 1_018u16 => "image/", - 1_019u16 => "message/", - 1_020u16 => "model/", - 1_021u16 => "multipart/", - 1_022u16 => "text/", - 1_023u16 => "video/", + 7u16 => "application/octet-stream", + 8u16 => "text/plain", + 9u16 => "application/json", + 10u16 => "text/json", + 11u16 => "application/cdr", + // - 10-1023 are reserved. // - 1024-65535 are free to use. }; // A perfect hashmap for fast lookup of prefixes - pub(super) const KNOWN_STRING: phf::OrderedMap<&'static str, EncodingPrefix> = phf_ordered_map! { - "" => 0u16, + const STR_TO_ID: phf::Map<&'static str, EncodingId> = phf_map! { // - Primitive types - "application/octet-stream" => 1u16, - "zenoh/int" => 2u16, - "zenoh/uint" => 3u16, - "zenoh/float" => 4u16, - "zenoh/bool" => 5u16, - "text/plain" => 6u16, + "zenoh/bytes" => 0u16, + "zenoh/int" => 1u16, + "zenoh/uint" => 2u16, + "zenoh/float" => 3u16, + "zenoh/bool" => 4u16, + "zenoh/string" => 5u16, + "zenoh/error" => 6u16, // - Advanced types - "application/json" => 7u16, - "text/json" => 8u16, - // - 9-15 are reserved. - "application/cdr" => 16u16, + "application/octet-stream" => 7u16, + "text/plain" => 8u16, + "application/json" => 9u16, + "text/json" => 10u16, + "application/cdr" => 11u16, // - 17-1019 are reserved. - "zenoh/" => 1_014u16, - "application/" => 1_015u16, - "audio/" => 1_016u16, - "font/" => 1_017u16, - "image/" => 1_018u16, - "message/" => 1_019u16, - "model/" => 1_020u16, - "multipart/" => 1_021u16, - "text/" => 1_022u16, - "video/" => 1_023u16, // - 1024-65535 are free to use. }; } -impl DefaultEncoding { - // - Primitives types supported in all Zenoh bindings - - /// See [`DefaultEncodingMapping::EMPTY`]. - pub const EMPTY: Encoding = Encoding::new(prefix::EMPTY); - /// An application-specific stream of bytes. - pub const APPLICATION_OCTET_STREAM: Encoding = Encoding::new(prefix::APPLICATION_OCTET_STREAM); - /// A VLE-encoded signed little-endian integer. Either 8bit, 16bit, 32bit, or 64bit. - /// Binary reprensentation uses two's complement. - pub const ZENOH_INT: Encoding = Encoding::new(prefix::ZENOH_INT); - /// A VLE-encoded little-endian unsigned integer. Either 8bit, 16bit, 32bit, or 64bit. - pub const ZENOH_UINT: Encoding = Encoding::new(prefix::ZENOH_UINT); - /// A VLE-encoded float. Either little-endian 32bit or 64bit. - /// Binary representation uses *IEEE 754-2008* *binary32* or *binary64*, respectively. - pub const ZENOH_FLOAT: Encoding = Encoding::new(prefix::ZENOH_FLOAT); - /// A boolean. `0` is `false`, `1` is `true`. Other values are invalid. - pub const ZENOH_BOOL: Encoding = Encoding::new(prefix::ZENOH_BOOL); - /// A UTF-8 encoded string. - pub const TEXT_PLAIN: Encoding = Encoding::new(prefix::TEXT_PLAIN); - - // - Advanced types supported in some Zenoh bindings. - - /// A JSON intended to be consumed by an application. - pub const APPLICATION_JSON: Encoding = Encoding::new(prefix::APPLICATION_JSON); - /// A JSON intended to be human readable. - pub const TEXT_JSON: Encoding = Encoding::new(prefix::TEXT_JSON); - - // - List of known mapping. Encoding capabilities may not be provided at all. - - /// A Common Data Representation (CDR)-encoded data. A [suffix](`Encoding::suffix`) may be provided in the [`Encoding`] to specify the concrete type. - pub const APPLICATION_CDR: Encoding = Encoding::new(prefix::APPLICATION_CDR); - - // - A list of known prefixes. - - /// Common prefix for Zenoh-defined types. - pub const ZENOH: Encoding = Encoding::new(prefix::ZENOH); - - // - A list of IANA registries. - - /// Common prefix for *application* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#application). - pub const APPLICATION: Encoding = Encoding::new(prefix::APPLICATION); - /// Common prefix for *audio* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#audio). - pub const AUDIO: Encoding = Encoding::new(prefix::AUDIO); - /// Common prefix for *font* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#font). - pub const FONT: Encoding = Encoding::new(prefix::FONT); - /// Common prefix for *image* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#image). - pub const IMAGE: Encoding = Encoding::new(prefix::IMAGE); - /// Common prefix for *message* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#message). - pub const MESSAGE: Encoding = Encoding::new(prefix::MESSAGE); - /// Common prefix for *model* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#model). - pub const MODEL: Encoding = Encoding::new(prefix::MODEL); - /// Common prefix for *multipart* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#multipart). - pub const MULTIPART: Encoding = Encoding::new(prefix::MULTIPART); - /// Common prefix for *text* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#text). - pub const TEXT: Encoding = Encoding::new(prefix::TEXT); - /// Common prefix for *video* MIME types defined by [IANA](https://www.iana.org/assignments/media-types/media-types.xhtml#video). - pub const VIDEO: Encoding = Encoding::new(prefix::VIDEO); -} - /// Trait to create, resolve, parse an [`Encoding`] mapping. pub trait EncodingMapping { - // The minimum prefix used by the EncodingMapping implementer - const MIN: EncodingPrefix; - // The maximum prefix used by the EncodingMapping implementer - const MAX: EncodingPrefix; - /// Map a numerical prefix to its string representation. - fn prefix_to_str(&self, e: EncodingPrefix) -> Option>; + fn id_to_str(&self, e: EncodingId) -> Option>; /// Map a string to a known numerical prefix ID. - fn str_to_prefix(&self, s: &str) -> Option; + fn str_to_id(&self, s: S) -> Option + where + S: AsRef; /// Parse a string into a valid [`Encoding`]. - fn parse(&self, s: S) -> ZResult + fn parse(&self, s: S) -> Encoding where - S: Into>; + S: AsRef; fn to_str(&self, e: &Encoding) -> Cow<'_, str>; } impl EncodingMapping for DefaultEncoding { - const MIN: EncodingPrefix = 0; - const MAX: EncodingPrefix = 1023; - - /// Given a numerical [`EncodingPrefix`] returns its string representation. - fn prefix_to_str(&self, p: EncodingPrefix) -> Option> { - prefix::KNOWN_PREFIX.get(&p).map(|s| Cow::Borrowed(*s)) + /// Given a numerical [`EncodingId`] returns its string representation. + fn id_to_str(&self, p: EncodingId) -> Option> { + Self::ID_TO_STR.get(&p).map(|s| Cow::Borrowed(*s)) } - /// Given the string representation of a prefix returns its numerical representation as [`EncodingPrefix`]. + /// Given the string representation of a prefix returns its numerical representation as [`EncodingId`]. /// [EMPTY](`DefaultEncodingMapping::EMPTY`) is returned in case of unknown mapping. - fn str_to_prefix(&self, s: &str) -> Option { - prefix::KNOWN_STRING.get(s).copied() + fn str_to_id(&self, s: S) -> Option + where + S: AsRef, + { + fn _str_to_id(s: &str) -> Option { + DefaultEncoding::STR_TO_ID.get(s).copied() + } + _str_to_id(s.as_ref()) } /// Parse a string into a valid [`Encoding`]. This functions performs the necessary /// prefix mapping and suffix substring when parsing the input. In case of unknown prefix mapping, /// the [prefix](`Encoding::prefix`) will be set to [EMPTY](`DefaultEncodingMapping::EMPTY`) and the /// full string will be part of the [suffix](`Encoding::suffix`). - fn parse(&self, t: S) -> ZResult + fn parse(&self, t: S) -> Encoding where - S: Into>, + S: AsRef, { - fn _parse(_self: &DefaultEncoding, t: Cow<'static, str>) -> ZResult { + fn _parse(t: &str) -> Encoding { // Check if empty - if t.is_empty() { - return Ok(DefaultEncoding::EMPTY); - } - // Try first an exact lookup of the string to prefix - if let Some(p) = _self.str_to_prefix(t.as_ref()) { - return Ok(Encoding::new(p)); + if !t.is_empty() { + return Encoding::empty(); } - // Check if the passed string matches one of the known prefixes. It will map the known string - // prefix to the numerical prefix and carry the remaining part of the string in the suffix. - // Skip empty string mapping. The order is guaranteed by the phf::OrderedMap. - for (s, p) in prefix::KNOWN_STRING.entries().skip(1) { - if let Some(i) = t.find(s) { - let e = Encoding::new(*p); - match t { - Cow::Borrowed(s) => return e.with_suffix(s.split_at(i + s.len()).1), - Cow::Owned(mut s) => return e.with_suffix(s.split_off(i + s.len())), - } + + // Everything before `;` may be mapped to a known id + let (id, schema) = t.split_once(';').unwrap_or((t, "")); + match DefaultEncoding.str_to_id(id) { + // Perfect match on ID and schema + Some(id) => { + let schema = Some(ZSlice::from(schema.to_string().into_bytes())); + Encoding { id, schema } + } + // No perfect match on ID and only schema + None => { + let schema = Some(ZSlice::from(schema.to_string().into_bytes())); + Encoding { id: 0, schema } } } - // No matching known prefix has been found, carry everything in the suffix. - DefaultEncoding::EMPTY.with_suffix(t) } - _parse(self, t.into()) + _parse(t.as_ref()) } /// Given an [`Encoding`] returns a full string representation. /// It concatenates the string represenation of the encoding prefix with the encoding suffix. fn to_str(&self, e: &Encoding) -> Cow<'_, str> { - let (p, s) = (e.prefix(), e.suffix()); - match self.prefix_to_str(p) { - Some(p) if s.is_empty() => p, - Some(p) => Cow::Owned(format!("{}{}", p, s)), - None => Cow::Owned(format!("unknown({}){}", p, s)), + fn schema_to_str(schema: &[u8]) -> &str { + std::str::from_utf8(schema).unwrap_or("unknown(non-utf8)") + } + + match (self.id_to_str(e.id), e.schema.as_ref()) { + (Some(i), None) => i, + (Some(i), Some(s)) => Cow::Owned(format!("{};{}", i, schema_to_str(s))), + (None, Some(s)) => Cow::Owned(format!("unknown({});{}", e.id, schema_to_str(s))), + (None, None) => Cow::Owned(format!("unknown({})", e.id)), } } }