Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol changes: EntityId (into protocol_changes) #774

Merged
merged 24 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2c18758
New Subscribers EntityId behavior for clients and peers
OlivierHecart Feb 2, 2024
ef1034e
Improve routing logging
OlivierHecart Feb 5, 2024
61c7dc3
New Queryables EntityId behavior for clients and peers
OlivierHecart Feb 5, 2024
1173547
Improve routing logging
OlivierHecart Feb 5, 2024
f43e9c1
Use proper QueryableId in Session and AdminSpace
OlivierHecart Feb 7, 2024
373d9a4
Sessions use runtime Id generator to avoid collisions
OlivierHecart Feb 7, 2024
ddcb6d5
AdminSpace use runtime Id generator to avoid collisions
OlivierHecart Feb 7, 2024
585db56
Use proper ResponderId
OlivierHecart Feb 7, 2024
d521496
Define EntityId type
OlivierHecart Feb 7, 2024
053565b
Add source_eid to SourceInfo
OlivierHecart Feb 7, 2024
540e1d1
Update source_info_stack_size test
OlivierHecart Feb 8, 2024
18af62b
Update source_info_stack_size test
OlivierHecart Feb 9, 2024
8398535
Introduce EntityGlobalId type
OlivierHecart Feb 9, 2024
8b3c094
Add id() function to Subscriber, Queryable and Publisher
OlivierHecart Feb 12, 2024
ccd6b9c
Add Publication::with_source_info() function
OlivierHecart Feb 12, 2024
7811590
Code format
OlivierHecart Feb 15, 2024
4d4bdb2
Remove ref to PR #703
OlivierHecart Feb 15, 2024
9ae62c9
Merge branch 'protocol_changes' into entityid
OlivierHecart Feb 28, 2024
1a60970
Fix doctests
OlivierHecart Feb 29, 2024
2558a67
Merge branch 'protocol_changes' into entityid
OlivierHecart Mar 11, 2024
493502c
Add comments
OlivierHecart Mar 12, 2024
0901a44
Remove comments
OlivierHecart Mar 12, 2024
4b7b131
Merge branch 'protocol_changes' into entityid
OlivierHecart Mar 12, 2024
4cfd6c9
Merge branch 'protocol_changes' into entityid
OlivierHecart Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,19 @@ where
let subscriber::UndeclareSubscriber { id, ext_wire_expr } = x;

// Header
let header = declare::id::U_SUBSCRIBER | subscriber::flag::Z;
let mut header = declare::id::U_SUBSCRIBER;
if !ext_wire_expr.is_null() {
header |= subscriber::flag::Z;
}
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;

// Extension
self.write(&mut *writer, (ext_wire_expr, false))?;
if !ext_wire_expr.is_null() {
self.write(&mut *writer, (ext_wire_expr, false))?;
}

Ok(())
}
Expand Down
22 changes: 11 additions & 11 deletions commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use zenoh_buffers::{
};
use zenoh_protocol::{
common::{imsg, ZExtZ64, ZExtZBufHeader},
core::{Reliability, ZenohId},
network::{ext::EntityIdType, *},
core::{EntityId, Reliability, ZenohId},
network::{ext::EntityGlobalIdType, *},
};

// NetworkMessage
Expand Down Expand Up @@ -218,21 +218,21 @@ where
}

// Extension: EntityId
impl<const ID: u8> LCodec<&ext::EntityIdType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::EntityIdType<{ ID }>) -> usize {
let EntityIdType { zid, eid } = x;
impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
let EntityGlobalIdType { zid, eid } = x;

1 + self.w_len(zid) + self.w_len(*eid)
}
}

impl<W, const ID: u8> WCodec<(&ext::EntityIdType<{ ID }>, bool), &mut W> for Zenoh080
impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (&ext::EntityIdType<{ ID }>, bool)) -> Self::Output {
fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
self.write(&mut *writer, (&header, more))?;
Expand All @@ -248,13 +248,13 @@ where
}
}

impl<R, const ID: u8> RCodec<(ext::EntityIdType<{ ID }>, bool), &mut R> for Zenoh080Header
impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::EntityIdType<{ ID }>, bool), Self::Error> {
fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;

let flags: u8 = self.codec.read(&mut *reader)?;
Expand All @@ -263,8 +263,8 @@ where
let lodec = Zenoh080Length::new(length);
let zid: ZenohId = lodec.read(&mut *reader)?;

let eid: u32 = self.codec.read(&mut *reader)?;
let eid: EntityId = self.codec.read(&mut *reader)?;

Ok((ext::EntityIdType { zid, eid }, more))
Ok((ext::EntityGlobalIdType { zid, eid }, more))
}
}
26 changes: 16 additions & 10 deletions commons/zenoh-codec/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use zenoh_buffers::{
use zenoh_protocol::common::{iext, ZExtUnit};
use zenoh_protocol::{
common::{imsg, ZExtZBufHeader},
core::{Encoding, ZenohId},
core::{Encoding, EntityGlobalId, EntityId, ZenohId},
zenoh::{ext, id, PushBody, RequestBody, ResponseBody},
};

Expand Down Expand Up @@ -150,9 +150,9 @@ where
// Extension: SourceInfo
impl<const ID: u8> LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize {
let ext::SourceInfoType { zid, eid, sn } = x;
let ext::SourceInfoType { id, sn } = x;

1 + self.w_len(zid) + self.w_len(*eid) + self.w_len(*sn)
1 + self.w_len(&id.zid) + self.w_len(id.eid) + self.w_len(*sn)
}
}

Expand All @@ -164,18 +164,18 @@ where

fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ext::SourceInfoType { zid, eid, sn } = x;
let ext::SourceInfoType { id, sn } = x;

let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
self.write(&mut *writer, (&header, more))?;

let flags: u8 = (zid.size() as u8 - 1) << 4;
let flags: u8 = (id.zid.size() as u8 - 1) << 4;
self.write(&mut *writer, flags)?;

let lodec = Zenoh080Length::new(zid.size());
lodec.write(&mut *writer, zid)?;
let lodec = Zenoh080Length::new(id.zid.size());
lodec.write(&mut *writer, &id.zid)?;

self.write(&mut *writer, eid)?;
self.write(&mut *writer, id.eid)?;
self.write(&mut *writer, sn)?;
Ok(())
}
Expand All @@ -196,10 +196,16 @@ where
let lodec = Zenoh080Length::new(length);
let zid: ZenohId = lodec.read(&mut *reader)?;

let eid: u32 = self.codec.read(&mut *reader)?;
let eid: EntityId = self.codec.read(&mut *reader)?;
let sn: u32 = self.codec.read(&mut *reader)?;

Ok((ext::SourceInfoType { zid, eid, sn }, more))
Ok((
ext::SourceInfoType {
id: EntityGlobalId { zid, eid },
sn,
},
more,
))
}
}

Expand Down
19 changes: 19 additions & 0 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,25 @@ impl<'de> serde::Deserialize<'de> for ZenohId {
}
}

pub type EntityId = u32;

#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)]
pub struct EntityGlobalId {
pub zid: ZenohId,
pub eid: EntityId,
}

impl EntityGlobalId {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
Self {
zid: ZenohId::rand(),
eid: rand::thread_rng().gen(),
OlivierHecart marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

#[repr(u8)]
#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)]
pub enum Priority {
Expand Down
4 changes: 4 additions & 0 deletions commons/zenoh-protocol/src/core/wire_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl<'a> WireExpr<'a> {
}
}

pub fn is_empty(&self) -> bool {
self.scope == 0 && self.suffix.as_ref().is_empty()
}

pub fn as_str(&'a self) -> &'a str {
if self.scope == 0 {
self.suffix.as_ref()
Expand Down
13 changes: 10 additions & 3 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ pub mod common {
}
}

pub fn is_null(&self) -> bool {
self.wire_expr.is_empty()
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
Self {
Expand Down Expand Up @@ -286,9 +290,11 @@ pub mod keyexpr {
}

pub mod subscriber {
use crate::core::EntityId;

use super::*;

pub type SubscriberId = u32;
pub type SubscriberId = EntityId;

pub mod flag {
pub const N: u8 = 1 << 5; // 0x20 Named if N==1 then the key expr has name/suffix
Expand Down Expand Up @@ -441,7 +447,6 @@ pub mod subscriber {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UndeclareSubscriber {
pub id: SubscriberId,
// WARNING: this is a temporary and mandatory extension used for undeclarations
OlivierHecart marked this conversation as resolved.
Show resolved Hide resolved
pub ext_wire_expr: common::ext::WireExprType,
}

Expand All @@ -460,9 +465,11 @@ pub mod subscriber {
}

pub mod queryable {
use crate::core::EntityId;

use super::*;

pub type QueryableId = u32;
pub type QueryableId = EntityId;

pub mod flag {
pub const N: u8 = 1 << 5; // 0x20 Named if N==1 then the key expr has name/suffix
Expand Down
10 changes: 5 additions & 5 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl From<ResponseFinal> for NetworkMessage {
pub mod ext {
use crate::{
common::{imsg, ZExtZ64},
core::{CongestionControl, Priority, ZenohId},
core::{CongestionControl, EntityId, Priority, ZenohId},
};
use core::fmt;

Expand Down Expand Up @@ -407,19 +407,19 @@ pub mod ext {
/// % eid %
/// +---------------+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EntityIdType<const ID: u8> {
pub struct EntityGlobalIdType<const ID: u8> {
pub zid: ZenohId,
pub eid: u32,
pub eid: EntityId,
}

impl<const ID: u8> EntityIdType<{ ID }> {
impl<const ID: u8> EntityGlobalIdType<{ ID }> {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();

let zid = ZenohId::rand();
let eid: u32 = rng.gen();
let eid: EntityId = rng.gen();
Self { zid, eid }
}
}
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/network/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub mod ext {
pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>;

pub type ResponderId = zextzbuf!(0x3, false);
pub type ResponderIdType = crate::network::ext::EntityIdType<{ ResponderId::ID }>;
pub type ResponderIdType = crate::network::ext::EntityGlobalIdType<{ ResponderId::ID }>;
}

impl Response {
Expand Down
10 changes: 4 additions & 6 deletions commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl From<Err> for ResponseBody {
pub mod ext {
use zenoh_buffers::ZBuf;

use crate::core::{Encoding, ZenohId};
use crate::core::{Encoding, EntityGlobalId};

/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
Expand All @@ -172,8 +172,7 @@ pub mod ext {
/// +---------------+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SourceInfoType<const ID: u8> {
pub zid: ZenohId,
pub eid: u32,
pub id: EntityGlobalId,
pub sn: u32,
}

Expand All @@ -183,10 +182,9 @@ pub mod ext {
use rand::Rng;
let mut rng = rand::thread_rng();

let zid = ZenohId::rand();
let eid: u32 = rng.gen();
let id = EntityGlobalId::rand();
let sn: u32 = rng.gen();
Self { zid, eid, sn }
Self { id, sn }
}
}

Expand Down
6 changes: 5 additions & 1 deletion zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Primitives for Face {
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
m.id,
&m.wire_expr,
&m.ext_info,
msg.ext_nodeid.node_id,
Expand All @@ -181,6 +182,7 @@ impl Primitives for Face {
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
m.id,
&m.ext_wire_expr.wire_expr,
msg.ext_nodeid.node_id,
);
Expand All @@ -190,6 +192,7 @@ impl Primitives for Face {
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
m.id,
&m.wire_expr,
&m.ext_info,
msg.ext_nodeid.node_id,
Expand All @@ -200,6 +203,7 @@ impl Primitives for Face {
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
m.id,
&m.ext_wire_expr.wire_expr,
msg.ext_nodeid.node_id,
);
Expand Down Expand Up @@ -244,7 +248,7 @@ impl Primitives for Face {
pull_data(&self.tables.tables, &self.state.clone(), msg.wire_expr);
}
_ => {
log::error!("Unsupported request");
log::error!("{} Unsupported request!", self);
}
}
}
Expand Down
Loading