Skip to content

Commit

Permalink
Update Reply protocol definition and codec (#717)
Browse files Browse the repository at this point in the history
* Update Reply protocol definition and codec

* Make consolidation a flag in Query/Reply

* Fix wrong Consolidation cast in codec

* Apply Reply changes to routing

* Fix shared-memory feature

* Fix stats

* Bump Zenoh Protocol Version

* Add query/reply ok(put|del)/err() tests
  • Loading branch information
Mallets authored Feb 9, 2024
1 parent 2bd4518 commit efe1135
Show file tree
Hide file tree
Showing 15 changed files with 329 additions and 333 deletions.
4 changes: 2 additions & 2 deletions commons/zenoh-codec/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ where
fn write(self, writer: &mut W, x: &ResponseBody) -> Self::Output {
match x {
ResponseBody::Reply(b) => self.write(&mut *writer, b),
ResponseBody::Err(b) => self.write(&mut *writer, b),
ResponseBody::Ack(b) => self.write(&mut *writer, b),
ResponseBody::Err(b) => self.write(&mut *writer, b),
ResponseBody::Put(b) => self.write(&mut *writer, b),
}
}
Expand All @@ -140,8 +140,8 @@ where
let codec = Zenoh080Header::new(header);
let body = match imsg::mid(codec.header) {
id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?),
id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
id::ACK => ResponseBody::Ack(codec.read(&mut *reader)?),
id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
id::PUT => ResponseBody::Put(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};
Expand Down
70 changes: 34 additions & 36 deletions commons/zenoh-codec/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,46 @@ use zenoh_protocol::{
common::{iext, imsg},
zenoh::{
id,
query::{ext, flag, Query},
query::{ext, flag, Consolidation, Query},
},
};

// Extension Consolidation
impl<W> WCodec<(ext::ConsolidationType, bool), &mut W> for Zenoh080
// Consolidation
impl<W> WCodec<Consolidation, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (ext::ConsolidationType, bool)) -> Self::Output {
let (x, more) = x;
fn write(self, writer: &mut W, x: Consolidation) -> Self::Output {
let v: u64 = match x {
ext::ConsolidationType::Auto => 0,
ext::ConsolidationType::None => 1,
ext::ConsolidationType::Monotonic => 2,
ext::ConsolidationType::Latest => 3,
ext::ConsolidationType::Unique => 4,
Consolidation::Auto => 0,
Consolidation::None => 1,
Consolidation::Monotonic => 2,
Consolidation::Latest => 3,
Consolidation::Unique => 4,
};
let v = ext::Consolidation::new(v);
self.write(&mut *writer, (&v, more))
self.write(&mut *writer, v)
}
}

impl<R> RCodec<(ext::ConsolidationType, bool), &mut R> for Zenoh080Header
impl<R> RCodec<Consolidation, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::ConsolidationType, bool), Self::Error> {
let (ext, more): (ext::Consolidation, bool) = self.read(&mut *reader)?;
let c = match ext.value {
0 => ext::ConsolidationType::Auto,
1 => ext::ConsolidationType::None,
2 => ext::ConsolidationType::Monotonic,
3 => ext::ConsolidationType::Latest,
4 => ext::ConsolidationType::Unique,
_ => return Err(DidntRead),
fn read(self, reader: &mut R) -> Result<Consolidation, Self::Error> {
let v: u64 = self.read(&mut *reader)?;
let c = match v {
0 => Consolidation::Auto,
1 => Consolidation::None,
2 => Consolidation::Monotonic,
3 => Consolidation::Latest,
4 => Consolidation::Unique,
_ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown
};
Ok((c, more))
Ok(c)
}
}

Expand All @@ -75,21 +73,23 @@ where

fn write(self, writer: &mut W, x: &Query) -> Self::Output {
let Query {
consolidation,
parameters,
ext_sinfo,
ext_consolidation,
ext_body,
ext_attachment,
ext_unknown,
} = x;

// Header
let mut header = id::QUERY;
if consolidation != &Consolidation::default() {
header |= flag::C;
}
if !parameters.is_empty() {
header |= flag::P;
}
let mut n_exts = (ext_sinfo.is_some() as u8)
+ ((ext_consolidation != &ext::ConsolidationType::default()) as u8)
+ (ext_body.is_some() as u8)
+ (ext_attachment.is_some() as u8)
+ (ext_unknown.len() as u8);
Expand All @@ -99,6 +99,9 @@ where
self.write(&mut *writer, header)?;

// Body
if consolidation != &Consolidation::default() {
self.write(&mut *writer, *consolidation)?;
}
if !parameters.is_empty() {
self.write(&mut *writer, parameters)?;
}
Expand All @@ -108,10 +111,6 @@ where
n_exts -= 1;
self.write(&mut *writer, (sinfo, n_exts != 0))?;
}
if ext_consolidation != &ext::ConsolidationType::default() {
n_exts -= 1;
self.write(&mut *writer, (*ext_consolidation, n_exts != 0))?;
}
if let Some(body) = ext_body.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (body, n_exts != 0))?;
Expand Down Expand Up @@ -154,14 +153,18 @@ where
}

// Body
let mut consolidation = Consolidation::default();
if imsg::has_flag(self.header, flag::C) {
consolidation = self.codec.read(&mut *reader)?;
}

let mut parameters = String::new();
if imsg::has_flag(self.header, flag::P) {
parameters = self.codec.read(&mut *reader)?;
}

// Extensions
let mut ext_sinfo: Option<ext::SourceInfoType> = None;
let mut ext_consolidation = ext::ConsolidationType::default();
let mut ext_body: Option<ext::QueryBodyType> = None;
let mut ext_attachment: Option<ext::AttachmentType> = None;
let mut ext_unknown = Vec::new();
Expand All @@ -176,11 +179,6 @@ where
ext_sinfo = Some(s);
has_ext = ext;
}
ext::Consolidation::ID => {
let (c, ext): (ext::ConsolidationType, bool) = eodec.read(&mut *reader)?;
ext_consolidation = c;
has_ext = ext;
}
ext::QueryBodyType::SID | ext::QueryBodyType::VID => {
let (s, ext): (ext::QueryBodyType, bool) = eodec.read(&mut *reader)?;
ext_body = Some(s);
Expand All @@ -200,9 +198,9 @@ where
}

Ok(Query {
consolidation,
parameters,
ext_sinfo,
ext_consolidation,
ext_body,
ext_attachment,
ext_unknown,
Expand Down
141 changes: 18 additions & 123 deletions commons/zenoh-codec/src/zenoh/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,18 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[cfg(not(feature = "shared-memory"))]
use crate::Zenoh080Bounded;
#[cfg(feature = "shared-memory")]
use crate::Zenoh080Sliced;
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
use alloc::vec::Vec;
use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg},
core::Encoding,
common::imsg,
zenoh::{
id,
reply::{ext, flag, Reply},
query::Consolidation,
reply::{flag, Reply, ReplyBody},
},
};

Expand All @@ -39,81 +34,35 @@ where

fn write(self, writer: &mut W, x: &Reply) -> Self::Output {
let Reply {
timestamp,
encoding,
ext_sinfo,
ext_consolidation,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_attachment,
consolidation,
ext_unknown,
payload,
} = x;

// Header
let mut header = id::REPLY;
if timestamp.is_some() {
header |= flag::T;
}
if encoding != &Encoding::default() {
header |= flag::E;
}
let mut n_exts = (ext_sinfo.is_some()) as u8
+ ((ext_consolidation != &ext::ConsolidationType::default()) as u8)
+ (ext_attachment.is_some()) as u8
+ (ext_unknown.len() as u8);
#[cfg(feature = "shared-memory")]
{
n_exts += ext_shm.is_some() as u8;
if consolidation != &Consolidation::default() {
header |= flag::C;
}
let mut n_exts = ext_unknown.len() as u8;
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;

// Body
if let Some(ts) = timestamp.as_ref() {
self.write(&mut *writer, ts)?;
}
if encoding != &Encoding::default() {
self.write(&mut *writer, encoding)?;
if consolidation != &Consolidation::default() {
self.write(&mut *writer, *consolidation)?;
}

// Extensions
if let Some(sinfo) = ext_sinfo.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (sinfo, n_exts != 0))?;
}
if ext_consolidation != &ext::ConsolidationType::default() {
n_exts -= 1;
self.write(&mut *writer, (*ext_consolidation, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(eshm) = ext_shm.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (eshm, n_exts != 0))?;
}
if let Some(att) = ext_attachment.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (att, n_exts != 0))?;
}
for u in ext_unknown.iter() {
n_exts -= 1;
self.write(&mut *writer, (u, n_exts != 0))?;
}

// Payload
#[cfg(feature = "shared-memory")]
{
let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
codec.write(&mut *writer, payload)?;
}

#[cfg(not(feature = "shared-memory"))]
{
let bodec = Zenoh080Bounded::<u32>::new();
bodec.write(&mut *writer, payload)?;
}
self.write(&mut *writer, payload)?;

Ok(())
}
Expand Down Expand Up @@ -144,81 +93,27 @@ where
}

// Body
let mut timestamp: Option<uhlc::Timestamp> = None;
if imsg::has_flag(self.header, flag::T) {
timestamp = Some(self.codec.read(&mut *reader)?);
}

let mut encoding = Encoding::default();
if imsg::has_flag(self.header, flag::E) {
encoding = self.codec.read(&mut *reader)?;
let mut consolidation = Consolidation::default();
if imsg::has_flag(self.header, flag::C) {
consolidation = self.codec.read(&mut *reader)?;
}

// Extensions
let mut ext_sinfo: Option<ext::SourceInfoType> = None;
let mut ext_consolidation = ext::ConsolidationType::default();
#[cfg(feature = "shared-memory")]
let mut ext_shm: Option<ext::ShmType> = None;
let mut ext_attachment: Option<ext::AttachmentType> = None;
let mut ext_unknown = Vec::new();

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
let ext: u8 = self.codec.read(&mut *reader)?;
let eodec = Zenoh080Header::new(ext);
match iext::eid(ext) {
ext::SourceInfo::ID => {
let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
ext_sinfo = Some(s);
has_ext = ext;
}
ext::Consolidation::ID => {
let (c, ext): (ext::ConsolidationType, bool) = eodec.read(&mut *reader)?;
ext_consolidation = c;
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::ShmType, bool) = eodec.read(&mut *reader)?;
ext_shm = Some(s);
has_ext = ext;
}
ext::Attachment::ID => {
let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?;
ext_attachment = Some(a);
has_ext = ext;
}
_ => {
let (u, ext) = extension::read(reader, "Reply", ext)?;
ext_unknown.push(u);
has_ext = ext;
}
}
let (u, ext) = extension::read(reader, "Reply", ext)?;
ext_unknown.push(u);
has_ext = ext;
}

// Payload
let payload: ZBuf = {
#[cfg(feature = "shared-memory")]
{
let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
codec.read(&mut *reader)?
}

#[cfg(not(feature = "shared-memory"))]
{
let bodec = Zenoh080Bounded::<u32>::new();
bodec.read(&mut *reader)?
}
};
let payload: ReplyBody = self.codec.read(&mut *reader)?;

Ok(Reply {
timestamp,
encoding,
ext_sinfo,
ext_consolidation,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_attachment,
consolidation,
ext_unknown,
payload,
})
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ fn codec_network() {
run!(NetworkMessage, NetworkMessage::rand());
}

// Zenoh new
// Zenoh
#[test]
fn codec_put() {
run!(zenoh::Put, zenoh::Put::rand());
Expand Down
Loading

0 comments on commit efe1135

Please sign in to comment.