Skip to content

Commit cfb86a8

Browse files
Enhance subscribers, queryables and liveliness tokens propagation to improve scalability (#814)
* Router implements interests protocol for clients * Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients for pico * Fix WireExprExt M flag encoding/decoding * Fix decl_key * Clients send all samples and queries to routers and peers * Avoid self declaration loop on interest * Fix query/replies copy/paste bugs * Peers implement interests protocol for clients * Don't send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients * Add client writer-side filtering (#863) * Add client writer-side filtering * Reimplement liveliness with interests * Fix writer-side filtering before receiving FinalInterest * Fix pubsub interest based routing after router failover * Declare message can be Push/Request/RequestContinuous/Response * Address review comments * Remove F: Future flag from DeclareInterest * cargo fmt --all * Remove unused Interest flags field * Update doc * Remove unneeded interest_id field * Update commons/zenoh-protocol/src/network/declare.rs * Remove unused UndeclareInterest * Implement proper Declare Request/Response id correlation * Add new Interest network message * Update doc * Update codec * Fix stable build * Fix test_acl * Fix writer side filtering * Add separate functions to compute matching status * Fix unstable imports * Remove useless checks --------- Co-authored-by: Luca Cominardi <[email protected]>
1 parent 511bc67 commit cfb86a8

File tree

30 files changed

+2856
-664
lines changed

30 files changed

+2856
-664
lines changed

commons/zenoh-codec/src/network/declare.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ where
958958
if x.wire_expr.has_suffix() {
959959
flags |= 1;
960960
}
961-
if let Mapping::Receiver = wire_expr.mapping {
961+
if let Mapping::Sender = wire_expr.mapping {
962962
flags |= 1 << 1;
963963
}
964964
codec.write(&mut zriter, flags)?;
@@ -998,9 +998,9 @@ where
998998
String::new()
999999
};
10001000
let mapping = if imsg::has_flag(flags, 1 << 1) {
1001-
Mapping::Receiver
1002-
} else {
10031001
Mapping::Sender
1002+
} else {
1003+
Mapping::Receiver
10041004
};
10051005

10061006
Ok((

commons/zenoh-codec/src/network/interest.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use zenoh_protocol::{
2323
core::WireExpr,
2424
network::{
2525
declare, id,
26-
interest::{self, InterestMode, InterestOptions},
27-
Interest, Mapping,
26+
interest::{self, Interest, InterestMode, InterestOptions},
27+
Mapping,
2828
},
2929
};
3030

commons/zenoh-protocol/src/network/declare.rs

+13
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,19 @@ pub mod common {
178178
pub mod ext {
179179
use super::*;
180180

181+
/// Flags:
182+
/// - N: Named If N==1 then the key expr has name/suffix
183+
/// - M: Mapping if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver
184+
///
185+
/// 7 6 5 4 3 2 1 0
186+
/// +-+-+-+-+-+-+-+-+
187+
/// |X|X|X|X|X|X|M|N|
188+
/// +-+-+-+---------+
189+
/// ~ key_scope:z16 ~
190+
/// +---------------+
191+
/// ~ key_suffix ~ if N==1 -- <u8;z16>
192+
/// +---------------+
193+
///
181194
pub type WireExprExt = zextzbuf!(0x0f, true);
182195
#[derive(Debug, Clone, PartialEq, Eq)]
183196
pub struct WireExprType {

commons/zenoh-protocol/src/network/interest.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ pub mod flag {
121121
pub type DeclareRequestId = u32;
122122
pub type AtomicDeclareRequestId = AtomicU32;
123123

124-
#[derive(Debug, Clone, PartialEq, Eq)]
124+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125125
pub enum InterestMode {
126126
Final,
127127
Current,

zenoh/src/api/builders/publication.rs

+11-18
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
314314
fn create_one_shot_publisher(self) -> ZResult<Publisher<'a>> {
315315
Ok(Publisher {
316316
session: self.session,
317-
#[cfg(feature = "unstable")]
318-
eid: 0, // This is a one shot Publisher
317+
id: 0, // This is a one shot Publisher
319318
key_expr: self.key_expr?,
320319
congestion_control: self.congestion_control,
321320
priority: self.priority,
@@ -363,22 +362,16 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
363362
}
364363
}
365364
self.session
366-
.declare_publication_intent(key_expr.clone())
367-
.wait()?;
368-
#[cfg(feature = "unstable")]
369-
let eid = self.session.runtime.next_id();
370-
let publisher = Publisher {
371-
session: self.session,
372-
#[cfg(feature = "unstable")]
373-
eid,
374-
key_expr,
375-
congestion_control: self.congestion_control,
376-
priority: self.priority,
377-
is_express: self.is_express,
378-
destination: self.destination,
379-
};
380-
tracing::trace!("publish({:?})", publisher.key_expr);
381-
Ok(publisher)
365+
.declare_publisher_inner(key_expr.clone(), self.destination)
366+
.map(|id| Publisher {
367+
session: self.session,
368+
id,
369+
key_expr,
370+
congestion_control: self.congestion_control,
371+
priority: self.priority,
372+
is_express: self.is_express,
373+
destination: self.destination,
374+
})
382375
}
383376
}
384377

zenoh/src/api/publication.rs

+23-14
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::{
1616
convert::TryFrom,
17+
fmt,
1718
future::{IntoFuture, Ready},
1819
pin::Pin,
1920
task::{Context, Poll},
@@ -32,9 +33,7 @@ use zenoh_result::{Error, ZResult};
3233
use {
3334
crate::api::handlers::{Callback, DefaultHandler, IntoHandler},
3435
crate::api::sample::SourceInfo,
35-
crate::api::Id,
3636
zenoh_protocol::core::EntityGlobalId,
37-
zenoh_protocol::core::EntityId,
3837
};
3938

4039
use super::{
@@ -48,7 +47,23 @@ use super::{
4847
sample::{DataInfo, Locality, QoS, Sample, SampleFields, SampleKind},
4948
session::{SessionRef, Undeclarable},
5049
};
51-
use crate::net::primitives::Primitives;
50+
use crate::{api::Id, net::primitives::Primitives};
51+
52+
pub(crate) struct PublisherState {
53+
pub(crate) id: Id,
54+
pub(crate) remote_id: Id,
55+
pub(crate) key_expr: KeyExpr<'static>,
56+
pub(crate) destination: Locality,
57+
}
58+
59+
impl fmt::Debug for PublisherState {
60+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61+
f.debug_struct("Publisher")
62+
.field("id", &self.id)
63+
.field("key_expr", &self.key_expr)
64+
.finish()
65+
}
66+
}
5267

5368
#[zenoh_macros::unstable]
5469
#[derive(Clone)]
@@ -113,8 +128,7 @@ impl std::fmt::Debug for PublisherRef<'_> {
113128
#[derive(Debug, Clone)]
114129
pub struct Publisher<'a> {
115130
pub(crate) session: SessionRef<'a>,
116-
#[cfg(feature = "unstable")]
117-
pub(crate) eid: EntityId,
131+
pub(crate) id: Id,
118132
pub(crate) key_expr: KeyExpr<'a>,
119133
pub(crate) congestion_control: CongestionControl,
120134
pub(crate) priority: Priority,
@@ -142,7 +156,7 @@ impl<'a> Publisher<'a> {
142156
pub fn id(&self) -> EntityGlobalId {
143157
EntityGlobalId {
144158
zid: self.session.zid(),
145-
eid: self.eid,
159+
eid: self.id,
146160
}
147161
}
148162

@@ -459,11 +473,9 @@ impl Resolvable for PublisherUndeclaration<'_> {
459473
impl Wait for PublisherUndeclaration<'_> {
460474
fn wait(mut self) -> <Self as Resolvable>::To {
461475
let Publisher {
462-
session, key_expr, ..
476+
session, id: eid, ..
463477
} = &self.publisher;
464-
session
465-
.undeclare_publication_intent(key_expr.clone())
466-
.wait()?;
478+
session.undeclare_publisher_inner(*eid)?;
467479
self.publisher.key_expr = unsafe { keyexpr::from_str_unchecked("") }.into();
468480
Ok(())
469481
}
@@ -481,10 +493,7 @@ impl IntoFuture for PublisherUndeclaration<'_> {
481493
impl Drop for Publisher<'_> {
482494
fn drop(&mut self) {
483495
if !self.key_expr.is_empty() {
484-
let _ = self
485-
.session
486-
.undeclare_publication_intent(self.key_expr.clone())
487-
.wait();
496+
let _ = self.session.undeclare_publisher_inner(self.id);
488497
}
489498
}
490499
}

0 commit comments

Comments
 (0)