Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Error message #813

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 23 additions & 34 deletions commons/zenoh-codec/src/zenoh/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Header};
use alloc::vec::Vec;
use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg},
core::Encoding,
zenoh::{
err::{ext, flag, Err},
id,
Expand All @@ -33,49 +35,42 @@ where

fn write(self, writer: &mut W, x: &Err) -> Self::Output {
let Err {
code,
is_infrastructure,
timestamp,
encoding,
ext_sinfo,
ext_body,
ext_unknown,
payload,
} = x;

// Header
let mut header = id::ERR;
if timestamp.is_some() {
header |= flag::T;
if encoding != &Encoding::empty() {
header |= flag::E;
}
if *is_infrastructure {
header |= flag::I;
}
let mut n_exts =
(ext_sinfo.is_some() as u8) + (ext_body.is_some() as u8) + (ext_unknown.len() as u8);
let mut n_exts = (ext_sinfo.is_some() as u8) + (ext_unknown.len() as u8);
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, code)?;
if let Some(ts) = timestamp.as_ref() {
self.write(&mut *writer, ts)?;
if encoding != &Encoding::empty() {
self.write(&mut *writer, encoding)?;
}

// Extensions
if let Some(sinfo) = ext_sinfo.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (sinfo, n_exts != 0))?;
}
if let Some(body) = ext_body.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (body, n_exts != 0))?;
}
for u in ext_unknown.iter() {
n_exts -= 1;
self.write(&mut *writer, (u, n_exts != 0))?;
}

// Payload
let bodec = Zenoh080Bounded::<u32>::new();
bodec.write(&mut *writer, payload)?;

Ok(())
}
}
Expand Down Expand Up @@ -105,16 +100,13 @@ where
}

// Body
let code: u16 = self.codec.read(&mut *reader)?;
let is_infrastructure = imsg::has_flag(self.header, flag::I);
let mut timestamp: Option<uhlc::Timestamp> = None;
if imsg::has_flag(self.header, flag::T) {
timestamp = Some(self.codec.read(&mut *reader)?);
let mut encoding = Encoding::empty();
if imsg::has_flag(self.header, flag::E) {
encoding = self.codec.read(&mut *reader)?;
}

// Extensions
let mut ext_sinfo: Option<ext::SourceInfoType> = None;
let mut ext_body: Option<ext::ErrBodyType> = None;
let mut ext_unknown = Vec::new();

let mut has_ext = imsg::has_flag(self.header, flag::Z);
Expand All @@ -127,11 +119,6 @@ where
ext_sinfo = Some(s);
has_ext = ext;
}
ext::ErrBodyType::VID | ext::ErrBodyType::SID => {
let (s, ext): (ext::ErrBodyType, bool) = eodec.read(&mut *reader)?;
ext_body = Some(s);
has_ext = ext;
}
_ => {
let (u, ext) = extension::read(reader, "Err", ext)?;
ext_unknown.push(u);
Expand All @@ -140,13 +127,15 @@ where
}
}

// Payload
let bodec = Zenoh080Bounded::<u32>::new();
let payload: ZBuf = bodec.read(&mut *reader)?;

Ok(Err {
code,
is_infrastructure,
timestamp,
encoding,
ext_sinfo,
ext_body,
ext_unknown,
payload,
})
}
}
52 changes: 18 additions & 34 deletions commons/zenoh-protocol/src/zenoh/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,41 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::common::ZExtUnknown;
use crate::{common::ZExtUnknown, core::Encoding};
use alloc::vec::Vec;
use uhlc::Timestamp;
use zenoh_buffers::ZBuf;

/// # Err message
///
/// ```text
/// Flags:
/// - T: Timestamp If T==1 then the timestamp if present
/// - I: Infrastructure If I==1 then the error is related to the infrastructure else to the user
/// - X: Reserved
/// - E: Encoding If E==1 then the encoding is present
/// - Z: Extension If Z==1 then at least one extension is present
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|I|T| ERR |
/// |Z|E|X| ERR |
/// +-+-+-+---------+
/// % code:z16 %
/// +---------------+
/// ~ ts: <u8;z16> ~ if T==1
/// ~ encoding ~ if E==1
/// +---------------+
/// ~ [err_exts] ~ if Z==1
/// +---------------+
/// ~ pl: <u8;z32> ~ -- Payload
/// +---------------+
/// ```
pub mod flag {
pub const T: u8 = 1 << 5; // 0x20 Timestamp if T==0 then the timestamp if present
pub const I: u8 = 1 << 6; // 0x40 Infrastructure if I==1 then the error is related to the infrastructure else to the user
// pub const X: u8 = 1 << 5; // 0x20 Reserved
pub const E: u8 = 1 << 6; // 0x40 Encoding if E==1 then the encoding is present
pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Err {
pub code: u16,
pub is_infrastructure: bool,
pub timestamp: Option<Timestamp>,
pub encoding: Encoding,
pub ext_sinfo: Option<ext::SourceInfoType>,
pub ext_body: Option<ext::ErrBodyType>,
pub ext_unknown: Vec<ZExtUnknown>,
pub payload: ZBuf,
}

pub mod ext {
Expand All @@ -57,45 +55,31 @@ pub mod ext {
/// Used to carry additional information about the source of data
pub type SourceInfo = zextzbuf!(0x1, false);
pub type SourceInfoType = crate::zenoh::ext::SourceInfoType<{ SourceInfo::ID }>;

/// # ErrBody extension
/// Used to carry a body attached to the query
/// Shared Memory extension is automatically defined by ValueType extension if
/// #[cfg(feature = "shared-memory")] is defined.
pub type ErrBodyType = crate::zenoh::ext::ValueType<{ ZExtZBuf::<0x02>::id(false) }, 0x03>;
}

impl Err {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use crate::{common::iext, core::ZenohId};
use crate::common::iext;
use rand::Rng;
let mut rng = rand::thread_rng();

let code: u16 = rng.gen();
let is_infrastructure = rng.gen_bool(0.5);
let timestamp = rng.gen_bool(0.5).then_some({
let time = uhlc::NTP64(rng.gen());
let id = uhlc::ID::try_from(ZenohId::rand().to_le_bytes()).unwrap();
Timestamp::new(time, id)
});
let encoding = Encoding::rand();
let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand());
let ext_body = rng.gen_bool(0.5).then_some(ext::ErrBodyType::rand());
let mut ext_unknown = Vec::new();
for _ in 0..rng.gen_range(0..4) {
ext_unknown.push(ZExtUnknown::rand2(
iext::mid(ext::ErrBodyType::SID) + 1,
iext::mid(ext::SourceInfo::ID) + 1,
false,
));
}
let payload = ZBuf::rand(rng.gen_range(0..=64));

Self {
code,
is_infrastructure,
timestamp,
encoding,
ext_sinfo,
ext_body,
ext_unknown,
payload,
}
}
}
28 changes: 4 additions & 24 deletions io/zenoh-transport/src/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use zenoh_core::{zasyncread, zasyncwrite, zerror};
use zenoh_protocol::{
network::{NetworkBody, NetworkMessage, Push, Request, Response},
zenoh::{
err::{ext::ErrBodyType, Err},
err::Err,
ext::ShmType,
query::{ext::QueryBodyType, Query},
reply::ReplyBody,
Expand Down Expand Up @@ -123,31 +123,11 @@ impl MapShm for Reply {
// Impl - Err
impl MapShm for Err {
fn map_to_shminfo(&mut self) -> ZResult<bool> {
if let Self {
ext_body: Some(ErrBodyType {
payload, ext_shm, ..
}),
..
} = self
{
map_to_shminfo!(payload, ext_shm)
} else {
Ok(false)
}
Ok(false)
}

fn map_to_shmbuf(&mut self, shmr: &RwLock<SharedMemoryReader>) -> ZResult<bool> {
if let Self {
ext_body: Some(ErrBodyType {
payload, ext_shm, ..
}),
..
} = self
{
map_to_shmbuf!(payload, ext_shm, shmr)
} else {
Ok(false)
}
fn map_to_shmbuf(&mut self, _shmr: &RwLock<SharedMemoryReader>) -> ZResult<bool> {
Ok(false)
}
}

Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ macro_rules! inc_res_stats {
ResponseBody::Err(e) => {
stats.[<$txrx _z_reply_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_reply_pl_bytes>].[<inc_ $space>](
e.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0),
e.payload.len()
);
}
}
Expand Down
19 changes: 7 additions & 12 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ use std::ops::Deref;
use std::sync::Arc;
use uhlc::Timestamp;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::core::{EntityId, WireExpr};
use zenoh_protocol::network::{response, Mapping, RequestId, Response, ResponseFinal};
use zenoh_protocol::zenoh::{self, ext::ValueType, reply::ReplyBody, Del, Put, ResponseBody};
use zenoh_protocol::{
core::{EntityId, WireExpr},
network::{response, Mapping, RequestId, Response, ResponseFinal},
zenoh::{self, reply::ReplyBody, Del, Put, ResponseBody},
};
use zenoh_result::ZResult;

pub(crate) struct QueryInner {
Expand Down Expand Up @@ -380,17 +382,10 @@ impl SyncResolve for ReplyErrBuilder<'_> {
mapping: Mapping::Sender,
},
payload: ResponseBody::Err(zenoh::Err {
timestamp: None,
is_infrastructure: false,
encoding: self.value.encoding.into(),
ext_sinfo: None,
ext_unknown: vec![],
ext_body: Some(ValueType {
#[cfg(feature = "shared-memory")]
ext_shm: None,
payload: self.value.payload.into(),
encoding: self.value.encoding.into(),
}),
code: 0, // TODO
payload: self.value.payload.into(),
}),
ext_qos: response::ext::QoSType::RESPONSE,
ext_tstamp: None,
Expand Down
12 changes: 3 additions & 9 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2128,15 +2128,9 @@ impl Primitives for Session {
Some(query) => {
let callback = query.callback.clone();
std::mem::drop(state);
let value = match e.ext_body {
Some(body) => Value {
payload: body.payload.into(),
encoding: body.encoding.into(),
},
None => Value {
payload: Payload::empty(),
encoding: Encoding::default(),
},
let value = Value {
payload: e.payload.into(),
encoding: e.encoding.into(),
};
let replier_id = match e.ext_sinfo {
Some(info) => info.id.zid,
Expand Down