Skip to content

Commit

Permalink
Protocol changes: EntityId (into protocol_changes) (#774)
Browse files Browse the repository at this point in the history
* New Subscribers EntityId behavior for clients and peers

* Improve routing logging

* New Queryables EntityId behavior for clients and peers

* Improve routing logging

* Use proper QueryableId in Session and AdminSpace

* Sessions use runtime Id generator to avoid collisions

* AdminSpace use runtime Id generator to avoid collisions

* Use proper ResponderId

* Define EntityId type

* Add source_eid to SourceInfo

* Update source_info_stack_size test

* Update source_info_stack_size test

* Introduce EntityGlobalId type

* Add id() function to Subscriber, Queryable and Publisher

* Add Publication::with_source_info() function

* Code format

* Remove ref to PR #703

* Fix doctests

* Add comments

* Remove comments
  • Loading branch information
OlivierHecart authored Mar 12, 2024
1 parent fcbceb0 commit 41e2557
Show file tree
Hide file tree
Showing 36 changed files with 1,349 additions and 1,077 deletions.
13 changes: 7 additions & 6 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,19 @@ where
let subscriber::UndeclareSubscriber { id, ext_wire_expr } = x;

// Header
let header = declare::id::U_SUBSCRIBER | subscriber::flag::Z;
let mut header = declare::id::U_SUBSCRIBER;
if !ext_wire_expr.is_null() {
header |= subscriber::flag::Z;
}
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;

// Extension
self.write(&mut *writer, (ext_wire_expr, false))?;
if !ext_wire_expr.is_null() {
self.write(&mut *writer, (ext_wire_expr, false))?;
}

Ok(())
}
Expand Down Expand Up @@ -483,7 +488,6 @@ where
let id: subscriber::SubscriberId = self.codec.read(&mut *reader)?;

// Extensions
// WARNING: this is a temporary and mandatory extension used for undeclarations
let mut ext_wire_expr = common::ext::WireExprType::null();

let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z);
Expand Down Expand Up @@ -665,7 +669,6 @@ where
let id: queryable::QueryableId = self.codec.read(&mut *reader)?;

// Extensions
// WARNING: this is a temporary and mandatory extension used for undeclarations
let mut ext_wire_expr = common::ext::WireExprType::null();

let mut has_ext = imsg::has_flag(self.header, queryable::flag::Z);
Expand Down Expand Up @@ -813,7 +816,6 @@ where
let id: token::TokenId = self.codec.read(&mut *reader)?;

// Extensions
// WARNING: this is a temporary and mandatory extension used for undeclarations
let mut ext_wire_expr = common::ext::WireExprType::null();

let mut has_ext = imsg::has_flag(self.header, interest::flag::Z);
Expand Down Expand Up @@ -1032,7 +1034,6 @@ where
let id: interest::InterestId = self.codec.read(&mut *reader)?;

// Extensions
// WARNING: this is a temporary and mandatory extension used for undeclarations
let mut ext_wire_expr = common::ext::WireExprType::null();

let mut has_ext = imsg::has_flag(self.header, interest::flag::Z);
Expand Down
22 changes: 11 additions & 11 deletions commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use zenoh_buffers::{
};
use zenoh_protocol::{
common::{imsg, ZExtZ64, ZExtZBufHeader},
core::{Reliability, ZenohId},
network::{ext::EntityIdType, *},
core::{EntityId, Reliability, ZenohId},
network::{ext::EntityGlobalIdType, *},
};

// NetworkMessage
Expand Down Expand Up @@ -218,21 +218,21 @@ where
}

// Extension: EntityId
impl<const ID: u8> LCodec<&ext::EntityIdType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::EntityIdType<{ ID }>) -> usize {
let EntityIdType { zid, eid } = x;
impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
let EntityGlobalIdType { zid, eid } = x;

1 + self.w_len(zid) + self.w_len(*eid)
}
}

impl<W, const ID: u8> WCodec<(&ext::EntityIdType<{ ID }>, bool), &mut W> for Zenoh080
impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (&ext::EntityIdType<{ ID }>, bool)) -> Self::Output {
fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
self.write(&mut *writer, (&header, more))?;
Expand All @@ -248,13 +248,13 @@ where
}
}

impl<R, const ID: u8> RCodec<(ext::EntityIdType<{ ID }>, bool), &mut R> for Zenoh080Header
impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::EntityIdType<{ ID }>, bool), Self::Error> {
fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;

let flags: u8 = self.codec.read(&mut *reader)?;
Expand All @@ -263,8 +263,8 @@ where
let lodec = Zenoh080Length::new(length);
let zid: ZenohId = lodec.read(&mut *reader)?;

let eid: u32 = self.codec.read(&mut *reader)?;
let eid: EntityId = self.codec.read(&mut *reader)?;

Ok((ext::EntityIdType { zid, eid }, more))
Ok((ext::EntityGlobalIdType { zid, eid }, more))
}
}
26 changes: 16 additions & 10 deletions commons/zenoh-codec/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use zenoh_buffers::{
use zenoh_protocol::common::{iext, ZExtUnit};
use zenoh_protocol::{
common::{imsg, ZExtZBufHeader},
core::{Encoding, ZenohId},
core::{Encoding, EntityGlobalId, EntityId, ZenohId},
zenoh::{ext, id, PushBody, RequestBody, ResponseBody},
};

Expand Down Expand Up @@ -150,9 +150,9 @@ where
// Extension: SourceInfo
impl<const ID: u8> LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize {
let ext::SourceInfoType { zid, eid, sn } = x;
let ext::SourceInfoType { id, sn } = x;

1 + self.w_len(zid) + self.w_len(*eid) + self.w_len(*sn)
1 + self.w_len(&id.zid) + self.w_len(id.eid) + self.w_len(*sn)
}
}

Expand All @@ -164,18 +164,18 @@ where

fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ext::SourceInfoType { zid, eid, sn } = x;
let ext::SourceInfoType { id, sn } = x;

let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
self.write(&mut *writer, (&header, more))?;

let flags: u8 = (zid.size() as u8 - 1) << 4;
let flags: u8 = (id.zid.size() as u8 - 1) << 4;
self.write(&mut *writer, flags)?;

let lodec = Zenoh080Length::new(zid.size());
lodec.write(&mut *writer, zid)?;
let lodec = Zenoh080Length::new(id.zid.size());
lodec.write(&mut *writer, &id.zid)?;

self.write(&mut *writer, eid)?;
self.write(&mut *writer, id.eid)?;
self.write(&mut *writer, sn)?;
Ok(())
}
Expand All @@ -196,10 +196,16 @@ where
let lodec = Zenoh080Length::new(length);
let zid: ZenohId = lodec.read(&mut *reader)?;

let eid: u32 = self.codec.read(&mut *reader)?;
let eid: EntityId = self.codec.read(&mut *reader)?;
let sn: u32 = self.codec.read(&mut *reader)?;

Ok((ext::SourceInfoType { zid, eid, sn }, more))
Ok((
ext::SourceInfoType {
id: EntityGlobalId { zid, eid },
sn,
},
more,
))
}
}

Expand Down
21 changes: 21 additions & 0 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,27 @@ impl<'de> serde::Deserialize<'de> for ZenohId {
}
}

/// The unique id of a zenoh entity inside it's parent [`Session`].
pub type EntityId = u32;

/// The global unique id of a zenoh entity.
#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)]
pub struct EntityGlobalId {
pub zid: ZenohId,
pub eid: EntityId,
}

impl EntityGlobalId {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
Self {
zid: ZenohId::rand(),
eid: rand::thread_rng().gen(),
}
}
}

#[repr(u8)]
#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)]
pub enum Priority {
Expand Down
4 changes: 4 additions & 0 deletions commons/zenoh-protocol/src/core/wire_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl<'a> WireExpr<'a> {
}
}

pub fn is_empty(&self) -> bool {
self.scope == 0 && self.suffix.as_ref().is_empty()
}

pub fn as_str(&'a self) -> &'a str {
if self.scope == 0 {
self.suffix.as_ref()
Expand Down
17 changes: 10 additions & 7 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ pub mod common {
pub mod ext {
use super::*;

// WARNING: this is a temporary and mandatory extension used for undeclarations
pub type WireExprExt = zextzbuf!(0x0f, true);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WireExprType {
Expand All @@ -195,6 +194,10 @@ pub mod common {
}
}

pub fn is_null(&self) -> bool {
self.wire_expr.is_empty()
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
Self {
Expand Down Expand Up @@ -286,9 +289,11 @@ pub mod keyexpr {
}

pub mod subscriber {
use crate::core::EntityId;

use super::*;

pub type SubscriberId = u32;
pub type SubscriberId = EntityId;

pub mod flag {
pub const N: u8 = 1 << 5; // 0x20 Named if N==1 then the key expr has name/suffix
Expand Down Expand Up @@ -441,7 +446,6 @@ pub mod subscriber {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UndeclareSubscriber {
pub id: SubscriberId,
// WARNING: this is a temporary and mandatory extension used for undeclarations
pub ext_wire_expr: common::ext::WireExprType,
}

Expand All @@ -460,9 +464,11 @@ pub mod subscriber {
}

pub mod queryable {
use crate::core::EntityId;

use super::*;

pub type QueryableId = u32;
pub type QueryableId = EntityId;

pub mod flag {
pub const N: u8 = 1 << 5; // 0x20 Named if N==1 then the key expr has name/suffix
Expand Down Expand Up @@ -597,7 +603,6 @@ pub mod queryable {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UndeclareQueryable {
pub id: QueryableId,
// WARNING: this is a temporary and mandatory extension used for undeclarations
pub ext_wire_expr: common::ext::WireExprType,
}

Expand Down Expand Up @@ -683,7 +688,6 @@ pub mod token {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UndeclareToken {
pub id: TokenId,
// WARNING: this is a temporary and mandatory extension used for undeclarations
pub ext_wire_expr: common::ext::WireExprType,
}

Expand Down Expand Up @@ -1097,7 +1101,6 @@ pub mod interest {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UndeclareInterest {
pub id: InterestId,
// WARNING: this is a temporary and mandatory extension used for undeclarations
pub ext_wire_expr: common::ext::WireExprType,
}

Expand Down
10 changes: 5 additions & 5 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl From<ResponseFinal> for NetworkMessage {
pub mod ext {
use crate::{
common::{imsg, ZExtZ64},
core::{CongestionControl, Priority, ZenohId},
core::{CongestionControl, EntityId, Priority, ZenohId},
};
use core::fmt;

Expand Down Expand Up @@ -407,19 +407,19 @@ pub mod ext {
/// % eid %
/// +---------------+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EntityIdType<const ID: u8> {
pub struct EntityGlobalIdType<const ID: u8> {
pub zid: ZenohId,
pub eid: u32,
pub eid: EntityId,
}

impl<const ID: u8> EntityIdType<{ ID }> {
impl<const ID: u8> EntityGlobalIdType<{ ID }> {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();

let zid = ZenohId::rand();
let eid: u32 = rng.gen();
let eid: EntityId = rng.gen();
Self { zid, eid }
}
}
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/network/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub mod ext {
pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>;

pub type ResponderId = zextzbuf!(0x3, false);
pub type ResponderIdType = crate::network::ext::EntityIdType<{ ResponderId::ID }>;
pub type ResponderIdType = crate::network::ext::EntityGlobalIdType<{ ResponderId::ID }>;
}

impl Response {
Expand Down
10 changes: 4 additions & 6 deletions commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl From<Err> for ResponseBody {
pub mod ext {
use zenoh_buffers::ZBuf;

use crate::core::{Encoding, ZenohId};
use crate::core::{Encoding, EntityGlobalId};

/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
Expand All @@ -172,8 +172,7 @@ pub mod ext {
/// +---------------+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SourceInfoType<const ID: u8> {
pub zid: ZenohId,
pub eid: u32,
pub id: EntityGlobalId,
pub sn: u32,
}

Expand All @@ -183,10 +182,9 @@ pub mod ext {
use rand::Rng;
let mut rng = rand::thread_rng();

let zid = ZenohId::rand();
let eid: u32 = rng.gen();
let id = EntityGlobalId::rand();
let sn: u32 = rng.gen();
Self { zid, eid, sn }
Self { id, sn }
}
}

Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ extern crate zenoh_core;
#[macro_use]
extern crate zenoh_result;

pub(crate) type Id = usize;
pub(crate) type Id = u32;

use git_version::git_version;
use handlers::DefaultHandler;
Expand Down
Loading

0 comments on commit 41e2557

Please sign in to comment.