Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Declare message can be Push/Request/RequestContinuous/Response #902

Merged
merged 4 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 84 additions & 152 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ use zenoh_buffers::{
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg, ZExtZ64},
common::{
iext,
imsg::{self, HEADER_BITS},
ZExtZ64,
},
core::{ExprId, ExprLen, WireExpr},
network::{
declare::{
self, common, interest, keyexpr, queryable, subscriber, token, Declare, DeclareBody,
Interest,
DeclareMode, Interest,
},
id, Mapping,
},
Expand All @@ -48,8 +52,7 @@ where
DeclareBody::DeclareToken(r) => self.write(&mut *writer, r)?,
DeclareBody::UndeclareToken(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::FinalInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::UndeclareInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareFinal(r) => self.write(&mut *writer, r)?,
}

Ok(())
Expand Down Expand Up @@ -77,8 +80,7 @@ where
D_TOKEN => DeclareBody::DeclareToken(codec.read(&mut *reader)?),
U_TOKEN => DeclareBody::UndeclareToken(codec.read(&mut *reader)?),
D_INTEREST => DeclareBody::DeclareInterest(codec.read(&mut *reader)?),
F_INTEREST => DeclareBody::FinalInterest(codec.read(&mut *reader)?),
U_INTEREST => DeclareBody::UndeclareInterest(codec.read(&mut *reader)?),
D_FINAL => DeclareBody::DeclareFinal(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

Expand All @@ -95,7 +97,7 @@ where

fn write(self, writer: &mut W, x: &Declare) -> Self::Output {
let Declare {
interest_id,
mode,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -104,9 +106,13 @@ where

// Header
let mut header = id::DECLARE;
if x.interest_id.is_some() {
header |= declare::flag::I;
}
header |= match mode {
DeclareMode::Push => 0b00,
DeclareMode::Response(_) => 0b01,
DeclareMode::Request(_) => 0b10,
DeclareMode::RequestContinuous(_) => 0b11,
} << HEADER_BITS;

let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
+ (ext_tstamp.is_some() as u8)
+ ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
Expand All @@ -116,8 +122,11 @@ where
self.write(&mut *writer, header)?;

// Body
if let Some(interest_id) = interest_id {
self.write(&mut *writer, interest_id)?;
if let DeclareMode::Request(rid)
| DeclareMode::RequestContinuous(rid)
| DeclareMode::Response(rid) = mode
{
self.write(&mut *writer, rid)?;
}

// Extensions
Expand Down Expand Up @@ -166,10 +175,14 @@ where
return Err(DidntRead);
}

let mut interest_id = None;
if imsg::has_flag(self.header, declare::flag::I) {
interest_id = Some(self.codec.read(&mut *reader)?);
}
// Body
let mode = match (self.header >> HEADER_BITS) & 0b11 {
0b00 => DeclareMode::Push,
0b01 => DeclareMode::Response(self.codec.read(&mut *reader)?),
0b10 => DeclareMode::Request(self.codec.read(&mut *reader)?),
0b11 => DeclareMode::RequestContinuous(self.codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

// Extensions
let mut ext_qos = declare::ext::QoSType::DEFAULT;
Expand Down Expand Up @@ -206,7 +219,7 @@ where
let body: DeclareBody = self.codec.read(&mut *reader)?;

Ok(Declare {
interest_id,
mode,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -215,6 +228,59 @@ where
}
}

// Final
impl<W> WCodec<&common::DeclareFinal, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &common::DeclareFinal) -> Self::Output {
let common::DeclareFinal = x;

// Header
let header = declare::id::D_FINAL;
self.write(&mut *writer, header)?;

Ok(())
}
}

impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);

codec.read(reader)
}
}

impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
if imsg::mid(self.header) != declare::id::D_FINAL {
return Err(DidntRead);
}

// Extensions
let has_ext = imsg::has_flag(self.header, token::flag::Z);
if has_ext {
extension::skip_all(reader, "Final")?;
}

Ok(common::DeclareFinal)
}
}

// DeclareKeyExpr
impl<W> WCodec<&keyexpr::DeclareKeyExpr, &mut W> for Zenoh080
where
Expand Down Expand Up @@ -907,7 +973,7 @@ where
} = x;

// Header
let header = declare::id::D_INTEREST | x.flags();
let header = declare::id::D_INTEREST;
self.write(&mut *writer, header)?;

// Body
Expand Down Expand Up @@ -976,140 +1042,6 @@ where
}
}

// FinalInterest
impl<W> WCodec<&interest::FinalInterest, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &interest::FinalInterest) -> Self::Output {
let interest::FinalInterest { id } = x;

// Header
let header = declare::id::F_INTEREST;
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;

Ok(())
}
}

impl<R> RCodec<interest::FinalInterest, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::FinalInterest, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);

codec.read(reader)
}
}

impl<R> RCodec<interest::FinalInterest, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::FinalInterest, Self::Error> {
if imsg::mid(self.header) != declare::id::F_INTEREST {
return Err(DidntRead);
}

// Body
let id: interest::InterestId = self.codec.read(&mut *reader)?;

// Extensions
let has_ext = imsg::has_flag(self.header, token::flag::Z);
if has_ext {
extension::skip_all(reader, "FinalInterest")?;
}

Ok(interest::FinalInterest { id })
}
}

// UndeclareInterest
impl<W> WCodec<&interest::UndeclareInterest, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &interest::UndeclareInterest) -> Self::Output {
let interest::UndeclareInterest { id, ext_wire_expr } = x;

// Header
let header = declare::id::U_INTEREST | interest::flag::Z;
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;

// Extension
self.write(&mut *writer, (ext_wire_expr, false))?;

Ok(())
}
}

impl<R> RCodec<interest::UndeclareInterest, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::UndeclareInterest, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);

codec.read(reader)
}
}

impl<R> RCodec<interest::UndeclareInterest, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::UndeclareInterest, Self::Error> {
if imsg::mid(self.header) != declare::id::U_INTEREST {
return Err(DidntRead);
}

// Body
let id: interest::InterestId = self.codec.read(&mut *reader)?;

// Extensions
let mut ext_wire_expr = common::ext::WireExprType::null();

let mut has_ext = imsg::has_flag(self.header, interest::flag::Z);
while has_ext {
let ext: u8 = self.codec.read(&mut *reader)?;
let eodec = Zenoh080Header::new(ext);
match iext::eid(ext) {
common::ext::WireExprExt::ID => {
let (we, ext): (common::ext::WireExprType, bool) = eodec.read(&mut *reader)?;
ext_wire_expr = we;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "UndeclareInterest", ext)?;
}
}
}

Ok(interest::UndeclareInterest { id, ext_wire_expr })
}
}

// WARNING: this is a temporary extension used for undeclarations
impl<W> WCodec<(&common::ext::WireExprType, bool), &mut W> for Zenoh080
where
Expand Down
16 changes: 16 additions & 0 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ use zenoh_protocol::{
zenoh, zextunit, zextz64, zextzbuf,
};

#[test]
fn zbuf_test() {
let mut buffer = vec![0u8; 64];

let zbuf = ZBuf::empty();
let mut writer = buffer.writer();

let codec = Zenoh080::new();
codec.write(&mut writer, &zbuf).unwrap();
println!("Buffer: {:?}", buffer);

let mut reader = buffer.reader();
let ret: ZBuf = codec.read(&mut reader).unwrap();
assert_eq!(ret, zbuf);
}

const NUM_ITER: usize = 100;
const MAX_PAYLOAD_SIZE: usize = 256;

Expand Down
Loading
Loading