Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add background to advanced subscriber #1651

Merged
merged 96 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
2cc7f12
Expose and use ke macro
OlivierHecart Nov 5, 2024
163d3d5
Fix SourceInfo publication
OlivierHecart Nov 8, 2024
43a5d3c
Add AdvancedPublisher AdvancedSubscriber and AdvancedSubscriber
OlivierHecart Nov 8, 2024
9f076a7
Fix doctests
OlivierHecart Nov 8, 2024
3935974
Fix doc warnings
OlivierHecart Nov 8, 2024
405d76e
Remove debug trace
OlivierHecart Nov 13, 2024
71b22d3
Add history test
OlivierHecart Nov 13, 2024
af1b2a2
Fix periodic queries
OlivierHecart Nov 13, 2024
bd32356
Remove debug trace
OlivierHecart Nov 13, 2024
4e4bbb6
Lower test debug level
OlivierHecart Nov 13, 2024
f36c890
Add retransmission tests
OlivierHecart Nov 13, 2024
23f145d
Liveliness sub callback shoud increase pending queries counter
OlivierHecart Nov 13, 2024
de396a4
Liveliness sub callback shoud spawn periodic queries when enbaled
OlivierHecart Nov 13, 2024
ff24135
Add late_joiner test
OlivierHecart Nov 13, 2024
975aba4
Only treat pending samples when there are no more pending queries
OlivierHecart Nov 14, 2024
5d9ac8d
Apply proper sequencing for history
OlivierHecart Nov 14, 2024
2305b41
Improve AdvancedSubscriber
OlivierHecart Nov 14, 2024
883885b
Code reorg
OlivierHecart Nov 14, 2024
3201700
Code reorg
OlivierHecart Nov 14, 2024
ded789c
Fix deduplication
OlivierHecart Nov 14, 2024
e033553
Subscribe to liveliness tokens with history
OlivierHecart Nov 15, 2024
acaf341
Update builders
OlivierHecart Nov 15, 2024
ef63165
Add examples
OlivierHecart Nov 15, 2024
788a8e5
Fix rustdoc
OlivierHecart Nov 15, 2024
a40639c
Move stuff in State
OlivierHecart Nov 18, 2024
e1caa06
Code reorg
OlivierHecart Nov 18, 2024
ee1895f
Add smaple_miss_callback
OlivierHecart Nov 18, 2024
c8a43d1
Add sample miss test
OlivierHecart Nov 18, 2024
d11243d
Update z_advanced_sub example
OlivierHecart Nov 18, 2024
eb6dbf6
Explicit use in examples
OlivierHecart Nov 21, 2024
ee93de1
Update API
OlivierHecart Nov 28, 2024
596d0ed
Fix rustdoc
OlivierHecart Nov 28, 2024
35ae472
Allow sample miss detection when recovery disabled
OlivierHecart Nov 28, 2024
52bec17
Add miss_sample_callback to DataSubscriberBuilderExt
OlivierHecart Nov 28, 2024
a16d64c
Add sample_miss_detection to PublisherBuilderExt
OlivierHecart Nov 28, 2024
5662f34
Add test_advanced_sample_miss test
OlivierHecart Nov 28, 2024
18adcc7
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Nov 28, 2024
6da5655
Deliver sample even when no miss callback
OlivierHecart Nov 28, 2024
3449d73
Replace sample_miss_callback with sample_miss_listener
OlivierHecart Nov 28, 2024
76344c9
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Nov 29, 2024
ba72a96
Fix clippy warnings
OlivierHecart Nov 29, 2024
4df529e
Fix tests
OlivierHecart Nov 29, 2024
443d540
Add HistoryConf max_samples option
OlivierHecart Nov 29, 2024
f9c7d0e
Add HistoryConf max_age option
OlivierHecart Nov 29, 2024
3b28b04
Use BTreeMap
OlivierHecart Dec 2, 2024
9e7a99b
Add meta_keyexpr option
OlivierHecart Dec 4, 2024
43d9d79
Add late_joiner_detection and meta_keyexpr options on Subcriber side
OlivierHecart Dec 4, 2024
54dc83c
Renaming
OlivierHecart Dec 4, 2024
aacea79
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Dec 4, 2024
9d18085
Fix compilation issues
OlivierHecart Dec 4, 2024
7d3ab8d
Remove AdvancedCache from public API
OlivierHecart Dec 5, 2024
e59e76b
Update Session admin to match AdvancedSub
OlivierHecart Dec 5, 2024
71ce64b
Gather constants
OlivierHecart Dec 5, 2024
8810b37
Fix doc build
OlivierHecart Dec 5, 2024
dba1d50
Renaming
OlivierHecart Dec 5, 2024
95ce535
Mark PublicationCache and QueryingSubscriber as deprecated and remove…
OlivierHecart Dec 5, 2024
6023920
Remove z_pub_cache and z_query_sub entries from zenoh-ext examples RE…
OlivierHecart Dec 5, 2024
dbd7d22
Add z_advanced_pub and z_advanced_sub to zenoh-ext examples Cargo.toml
OlivierHecart Dec 5, 2024
7537985
Add CacheConfig replies_qos option
OlivierHecart Dec 5, 2024
f850eaa
Call cache directly from publisher
OlivierHecart Dec 5, 2024
1799a95
Update doc
OlivierHecart Dec 6, 2024
feeb7ef
Add missing unstable tags
OlivierHecart Dec 6, 2024
cd68431
Add missing unstable tags
OlivierHecart Dec 6, 2024
c9cc963
Add missing unstable tags
OlivierHecart Dec 6, 2024
2d6550d
Add unstable tag everywhere
OlivierHecart Dec 6, 2024
ab05e1e
Add missing AdvancedSubscriber methods
OlivierHecart Dec 9, 2024
1bae945
Fix WeakSession::Session internal function
OlivierHecart Dec 9, 2024
5ca2e50
Expose missing SampleMissListener and related structs
OlivierHecart Dec 9, 2024
7c374c4
Add AdvancedPublisherBuilderExt::advanced function
OlivierHecart Dec 9, 2024
0ffb133
Add missing AdvancedPublisherBuilder functions
OlivierHecart Dec 9, 2024
dc9200c
Fix doctests
OlivierHecart Dec 9, 2024
6e9f9b9
Expose Miss struct
OlivierHecart Dec 9, 2024
d3a78a0
impl QoSBuilderTrait for AdvancedPublisherBuilder
OlivierHecart Dec 9, 2024
7332d02
Propagate PublisherBuilder values to AdvancedPublisherBuilder
OlivierHecart Dec 9, 2024
b63c080
Rename AdvancedSubscriber::close()
OlivierHecart Dec 9, 2024
4aaca6e
Add unstable tags
OlivierHecart Dec 9, 2024
a8bb82f
Add AdvancedSubscriber::detect_publishers function
OlivierHecart Dec 9, 2024
2df37bc
Remove debug println
OlivierHecart Dec 9, 2024
fe51436
Renaming
OlivierHecart Dec 9, 2024
0bc7233
Add unstable tags
OlivierHecart Dec 9, 2024
00963f3
Use std Range
OlivierHecart Dec 9, 2024
3c6ba61
Spawn Timer in a tokio runtime
OlivierHecart Dec 10, 2024
18682ff
Fix panic when last_delivered is None
OlivierHecart Dec 10, 2024
24e1ede
Release lock before calling get
OlivierHecart Dec 10, 2024
87acf15
Update key mapping
OlivierHecart Dec 10, 2024
6332331
Improve doc
OlivierHecart Dec 10, 2024
b807ee8
fix: fix callback API (#1647)
wyfo Dec 10, 2024
2096085
Update doc
OlivierHecart Dec 10, 2024
c3a2608
feat: add background to advanced subscriber
wyfo Dec 10, 2024
dce3304
Fix ke_liveliness
OlivierHecart Dec 10, 2024
8574b4e
fix: add missing mut
wyfo Dec 10, 2024
e547a23
Fix doc
OlivierHecart Dec 10, 2024
e870d79
Fix doc
OlivierHecart Dec 11, 2024
12ba5fa
Fix zenoh-ext Cargo.toml
OlivierHecart Dec 11, 2024
c1cbf39
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Dec 11, 2024
8bb1796
Merge branch 'dev/advanced_pubsub' into dev/adv_pubsub_background
OlivierHecart Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add meta_keyexpr option
  • Loading branch information
OlivierHecart committed Dec 4, 2024
commit 9e7a99ba534f7bc61c5c24c65d8d79e9f5c6f67f
14 changes: 10 additions & 4 deletions zenoh-ext/src/advanced_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,21 @@ use zenoh::{
Resolvable, Resolve, Result as ZResult, Session, Wait,
};

// #[zenoh_macros::unstable]
// pub(crate) static KE_STAR: &keyexpr = ke!("*");
#[zenoh_macros::unstable]
pub(crate) static KE_STAR: &keyexpr = ke!("*");
pub(crate) static KE_STARSTAR: &keyexpr = ke!("**");
#[zenoh_macros::unstable]
pub(crate) static KE_PREFIX: &keyexpr = ke!("@cache");
pub(crate) static KE_PREFIX: &keyexpr = ke!("@adv");
#[zenoh_macros::unstable]
pub(crate) static KE_SEPARATOR: &keyexpr = ke!("@");
#[zenoh_macros::unstable]
pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc");
#[zenoh_macros::unstable]
pub(crate) static KE_EMPTY: &keyexpr = ke!("_");
#[zenoh_macros::unstable]
kedefine!(
pub(crate) ke_liveliness: "@cache/${zid:*}/${eid:*}/${remaining:**}",
pub(crate) ke_liveliness: "@adv/${zid:*}/${eid:*}/${meta:**}/@/${remaining:**}",
);

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -96,7 +102,7 @@ impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
AdvancedCacheBuilder {
session,
pub_key_expr,
queryable_prefix: Some(Ok((KE_PREFIX / KE_STAR / KE_STAR).into())),
queryable_prefix: Some(Ok((KE_PREFIX / KE_STARSTAR / KE_SEPARATOR).into())),
subscriber_origin: Locality::default(),
queryable_origin: Locality::default(),
history: CacheConfig::default(),
Expand Down
46 changes: 34 additions & 12 deletions zenoh-ext/src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use zenoh::{
};

use crate::{
advanced_cache::{AdvancedCache, CacheConfig, KE_PREFIX, KE_UHLC},
advanced_cache::{AdvancedCache, CacheConfig, KE_EMPTY, KE_PREFIX, KE_SEPARATOR, KE_UHLC},
SessionExt,
};

Expand All @@ -42,23 +42,25 @@ pub enum Sequencing {

/// The builder of PublicationCache, allowing to configure it.
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct AdvancedPublisherBuilder<'a, 'b> {
pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
sequencing: Sequencing,
liveliness: bool,
cache: bool,
history: CacheConfig,
}

impl<'a, 'b> AdvancedPublisherBuilder<'a, 'b> {
impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
pub(crate) fn new(
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
) -> AdvancedPublisherBuilder<'a, 'b> {
) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
AdvancedPublisherBuilder {
session,
pub_key_expr,
meta_key_expr: None,
sequencing: Sequencing::None,
liveliness: false,
cache: false,
Expand Down Expand Up @@ -91,19 +93,31 @@ impl<'a, 'b> AdvancedPublisherBuilder<'a, 'b> {
self.liveliness = true;
self
}

/// A key expression added to the liveliness token key expression
/// and to the cache queryable key expression.
/// It can be used to convey meta data.
pub fn meta_keyexpr<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
where
TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
{
self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
self
}
}

impl<'a> Resolvable for AdvancedPublisherBuilder<'a, '_> {
impl<'a> Resolvable for AdvancedPublisherBuilder<'a, '_, '_> {
type To = ZResult<AdvancedPublisher<'a>>;
}

impl Wait for AdvancedPublisherBuilder<'_, '_> {
impl Wait for AdvancedPublisherBuilder<'_, '_, '_> {
fn wait(self) -> <Self as Resolvable>::To {
AdvancedPublisher::new(self)
}
}

impl IntoFuture for AdvancedPublisherBuilder<'_, '_> {
impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

Expand All @@ -120,21 +134,29 @@ pub struct AdvancedPublisher<'a> {
}

impl<'a> AdvancedPublisher<'a> {
fn new(conf: AdvancedPublisherBuilder<'a, '_>) -> ZResult<Self> {
fn new(conf: AdvancedPublisherBuilder<'a, '_, '_>) -> ZResult<Self> {
let key_expr = conf.pub_key_expr?;
let meta = match conf.meta_key_expr {
Some(meta) => Some(meta?),
None => None,
};

let publisher = conf
.session
.declare_publisher(key_expr.clone().into_owned())
.wait()?;
let id = publisher.id();
let prefix = KE_PREFIX / &id.zid().into_keyexpr();
let prefix = match conf.sequencing {
Sequencing::SequenceNumber => {
KE_PREFIX
/ &id.zid().into_keyexpr()
/ &KeyExpr::try_from(id.eid().to_string()).unwrap()
prefix / &KeyExpr::try_from(id.eid().to_string()).unwrap()
}
_ => KE_PREFIX / &id.zid().into_keyexpr() / KE_UHLC,
_ => prefix / KE_UHLC,
};
let prefix = match meta {
Some(meta) => prefix / &meta / KE_SEPARATOR,
// We need this empty chunk because af a routing matching bug
_ => prefix / KE_EMPTY / KE_SEPARATOR,
};

let seqnum = match conf.sequencing {
Expand Down
10 changes: 7 additions & 3 deletions zenoh-ext/src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use {
zenoh::Result as ZResult,
};

use crate::advanced_cache::{ke_liveliness, KE_PREFIX, KE_STAR, KE_UHLC};
use crate::advanced_cache::{ke_liveliness, KE_PREFIX, KE_SEPARATOR, KE_STARSTAR, KE_UHLC};

#[derive(Debug, Default, Clone)]
/// Configure query for historical data.
Expand Down Expand Up @@ -464,6 +464,8 @@ impl Timed for PeriodicQuery {
let query_expr = KE_PREFIX
/ &self.source_id.zid().into_keyexpr()
/ &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
/ KE_STARSTAR
/ KE_SEPARATOR
/ &states.key_expr;
let seq_num_range = seq_num_range(Some(state.last_delivered.unwrap() + 1), None);

Expand Down Expand Up @@ -554,6 +556,8 @@ impl<Handler> AdvancedSubscriber<Handler> {
let query_expr = KE_PREFIX
/ &source_id.zid().into_keyexpr()
/ &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
/ KE_STARSTAR
/ KE_SEPARATOR
/ &key_expr;
let seq_num_range =
seq_num_range(Some(state.last_delivered.unwrap() + 1), None);
Expand Down Expand Up @@ -610,7 +614,7 @@ impl<Handler> AdvancedSubscriber<Handler> {
let _ = conf
.session
.get(Selector::from((
KE_PREFIX / KE_STAR / KE_STAR / &key_expr,
KE_PREFIX / KE_STARSTAR / KE_SEPARATOR / &key_expr,
params,
)))
.callback({
Expand Down Expand Up @@ -800,7 +804,7 @@ impl<Handler> AdvancedSubscriber<Handler> {
conf
.session
.liveliness()
.declare_subscriber(KE_PREFIX / KE_STAR / KE_STAR / &key_expr)
.declare_subscriber(KE_PREFIX / KE_STARSTAR / KE_SEPARATOR / &key_expr)
// .declare_subscriber(keformat!(ke_liveliness_all::formatter(), zid = 0, eid = 0, remaining = key_expr).unwrap())
.history(true)
.callback(live_callback)
Expand Down
16 changes: 8 additions & 8 deletions zenoh-ext/src/publisher_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,44 @@ use crate::{advanced_cache::CacheConfig, AdvancedPublisherBuilder};

/// Some extensions to the [`zenoh::publication::PublisherBuilder`](zenoh::publication::PublisherBuilder)
#[zenoh_macros::unstable]
pub trait PublisherBuilderExt<'a, 'b> {
pub trait PublisherBuilderExt<'a, 'b, 'c> {
/// Allow matching Subscribers to detect lost samples and
/// optionally ask for retransimission.
///
/// Retransmission can only be achieved if history is enabled.
fn cache(self, config: CacheConfig) -> AdvancedPublisherBuilder<'a, 'b>;
fn cache(self, config: CacheConfig) -> AdvancedPublisherBuilder<'a, 'b, 'c>;

/// Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
///
/// Retransmission can only be achieved if cache is enabled.
fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b>;
fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c>;

/// Allow this publisher to be detected by subscribers.
///
/// This allows Subscribers to retrieve the local history.
fn late_joiner_detection(self) -> AdvancedPublisherBuilder<'a, 'b>;
fn late_joiner_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c>;
}

impl<'a, 'b> PublisherBuilderExt<'a, 'b> for PublisherBuilder<'a, 'b> {
impl<'a, 'b, 'c> PublisherBuilderExt<'a, 'b, 'c> for PublisherBuilder<'a, 'b> {
/// Allow matching Subscribers to detect lost samples and
/// optionally ask for retransimission.
///
/// Retransmission can only be achieved if history is enabled.
fn cache(self, config: CacheConfig) -> AdvancedPublisherBuilder<'a, 'b> {
fn cache(self, config: CacheConfig) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
AdvancedPublisherBuilder::new(self.session, self.key_expr).cache(config)
}

/// Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
///
/// Retransmission can only be achieved if cache is enabled.
fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b> {
fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
AdvancedPublisherBuilder::new(self.session, self.key_expr).sample_miss_detection()
}

/// Allow this publisher to be detected by subscribers.
///
/// This allows Subscribers to retrieve the local history.
fn late_joiner_detection(self) -> AdvancedPublisherBuilder<'a, 'b> {
fn late_joiner_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
AdvancedPublisherBuilder::new(self.session, self.key_expr).late_joiner_detection()
}
}