Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Advanced Pub/Sub feature #1582

Merged
merged 91 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
2cc7f12
Expose and use ke macro
OlivierHecart Nov 5, 2024
163d3d5
Fix SourceInfo publication
OlivierHecart Nov 8, 2024
43a5d3c
Add AdvancedPublisher AdvancedSubscriber and AdvancedSubscriber
OlivierHecart Nov 8, 2024
9f076a7
Fix doctests
OlivierHecart Nov 8, 2024
3935974
Fix doc warnings
OlivierHecart Nov 8, 2024
405d76e
Remove debug trace
OlivierHecart Nov 13, 2024
71b22d3
Add history test
OlivierHecart Nov 13, 2024
af1b2a2
Fix periodic queries
OlivierHecart Nov 13, 2024
bd32356
Remove debug trace
OlivierHecart Nov 13, 2024
4e4bbb6
Lower test debug level
OlivierHecart Nov 13, 2024
f36c890
Add retransmission tests
OlivierHecart Nov 13, 2024
23f145d
Liveliness sub callback shoud increase pending queries counter
OlivierHecart Nov 13, 2024
de396a4
Liveliness sub callback shoud spawn periodic queries when enbaled
OlivierHecart Nov 13, 2024
ff24135
Add late_joiner test
OlivierHecart Nov 13, 2024
975aba4
Only treat pending samples when there are no more pending queries
OlivierHecart Nov 14, 2024
5d9ac8d
Apply proper sequencing for history
OlivierHecart Nov 14, 2024
2305b41
Improve AdvancedSubscriber
OlivierHecart Nov 14, 2024
883885b
Code reorg
OlivierHecart Nov 14, 2024
3201700
Code reorg
OlivierHecart Nov 14, 2024
ded789c
Fix deduplication
OlivierHecart Nov 14, 2024
e033553
Subscribe to liveliness tokens with history
OlivierHecart Nov 15, 2024
acaf341
Update builders
OlivierHecart Nov 15, 2024
ef63165
Add examples
OlivierHecart Nov 15, 2024
788a8e5
Fix rustdoc
OlivierHecart Nov 15, 2024
a40639c
Move stuff in State
OlivierHecart Nov 18, 2024
e1caa06
Code reorg
OlivierHecart Nov 18, 2024
ee1895f
Add smaple_miss_callback
OlivierHecart Nov 18, 2024
c8a43d1
Add sample miss test
OlivierHecart Nov 18, 2024
d11243d
Update z_advanced_sub example
OlivierHecart Nov 18, 2024
eb6dbf6
Explicit use in examples
OlivierHecart Nov 21, 2024
ee93de1
Update API
OlivierHecart Nov 28, 2024
596d0ed
Fix rustdoc
OlivierHecart Nov 28, 2024
35ae472
Allow sample miss detection when recovery disabled
OlivierHecart Nov 28, 2024
52bec17
Add miss_sample_callback to DataSubscriberBuilderExt
OlivierHecart Nov 28, 2024
a16d64c
Add sample_miss_detection to PublisherBuilderExt
OlivierHecart Nov 28, 2024
5662f34
Add test_advanced_sample_miss test
OlivierHecart Nov 28, 2024
18adcc7
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Nov 28, 2024
6da5655
Deliver sample even when no miss callback
OlivierHecart Nov 28, 2024
3449d73
Replace sample_miss_callback with sample_miss_listener
OlivierHecart Nov 28, 2024
76344c9
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Nov 29, 2024
ba72a96
Fix clippy warnings
OlivierHecart Nov 29, 2024
4df529e
Fix tests
OlivierHecart Nov 29, 2024
443d540
Add HistoryConf max_samples option
OlivierHecart Nov 29, 2024
f9c7d0e
Add HistoryConf max_age option
OlivierHecart Nov 29, 2024
3b28b04
Use BTreeMap
OlivierHecart Dec 2, 2024
9e7a99b
Add meta_keyexpr option
OlivierHecart Dec 4, 2024
43d9d79
Add late_joiner_detection and meta_keyexpr options on Subcriber side
OlivierHecart Dec 4, 2024
54dc83c
Renaming
OlivierHecart Dec 4, 2024
aacea79
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Dec 4, 2024
9d18085
Fix compilation issues
OlivierHecart Dec 4, 2024
7d3ab8d
Remove AdvancedCache from public API
OlivierHecart Dec 5, 2024
e59e76b
Update Session admin to match AdvancedSub
OlivierHecart Dec 5, 2024
71ce64b
Gather constants
OlivierHecart Dec 5, 2024
8810b37
Fix doc build
OlivierHecart Dec 5, 2024
dba1d50
Renaming
OlivierHecart Dec 5, 2024
95ce535
Mark PublicationCache and QueryingSubscriber as deprecated and remove…
OlivierHecart Dec 5, 2024
6023920
Remove z_pub_cache and z_query_sub entries from zenoh-ext examples RE…
OlivierHecart Dec 5, 2024
dbd7d22
Add z_advanced_pub and z_advanced_sub to zenoh-ext examples Cargo.toml
OlivierHecart Dec 5, 2024
7537985
Add CacheConfig replies_qos option
OlivierHecart Dec 5, 2024
f850eaa
Call cache directly from publisher
OlivierHecart Dec 5, 2024
1799a95
Update doc
OlivierHecart Dec 6, 2024
feeb7ef
Add missing unstable tags
OlivierHecart Dec 6, 2024
cd68431
Add missing unstable tags
OlivierHecart Dec 6, 2024
c9cc963
Add missing unstable tags
OlivierHecart Dec 6, 2024
2d6550d
Add unstable tag everywhere
OlivierHecart Dec 6, 2024
ab05e1e
Add missing AdvancedSubscriber methods
OlivierHecart Dec 9, 2024
1bae945
Fix WeakSession::Session internal function
OlivierHecart Dec 9, 2024
5ca2e50
Expose missing SampleMissListener and related structs
OlivierHecart Dec 9, 2024
7c374c4
Add AdvancedPublisherBuilderExt::advanced function
OlivierHecart Dec 9, 2024
0ffb133
Add missing AdvancedPublisherBuilder functions
OlivierHecart Dec 9, 2024
dc9200c
Fix doctests
OlivierHecart Dec 9, 2024
6e9f9b9
Expose Miss struct
OlivierHecart Dec 9, 2024
d3a78a0
impl QoSBuilderTrait for AdvancedPublisherBuilder
OlivierHecart Dec 9, 2024
7332d02
Propagate PublisherBuilder values to AdvancedPublisherBuilder
OlivierHecart Dec 9, 2024
b63c080
Rename AdvancedSubscriber::close()
OlivierHecart Dec 9, 2024
4aaca6e
Add unstable tags
OlivierHecart Dec 9, 2024
a8bb82f
Add AdvancedSubscriber::detect_publishers function
OlivierHecart Dec 9, 2024
2df37bc
Remove debug println
OlivierHecart Dec 9, 2024
fe51436
Renaming
OlivierHecart Dec 9, 2024
0bc7233
Add unstable tags
OlivierHecart Dec 9, 2024
00963f3
Use std Range
OlivierHecart Dec 9, 2024
3c6ba61
Spawn Timer in a tokio runtime
OlivierHecart Dec 10, 2024
18682ff
Fix panic when last_delivered is None
OlivierHecart Dec 10, 2024
24e1ede
Release lock before calling get
OlivierHecart Dec 10, 2024
87acf15
Update key mapping
OlivierHecart Dec 10, 2024
6332331
Improve doc
OlivierHecart Dec 10, 2024
b807ee8
fix: fix callback API (#1647)
wyfo Dec 10, 2024
2096085
Update doc
OlivierHecart Dec 10, 2024
dce3304
Fix ke_liveliness
OlivierHecart Dec 10, 2024
e547a23
Fix doc
OlivierHecart Dec 10, 2024
e870d79
Fix doc
OlivierHecart Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions zenoh-ext/src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,18 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
}
}

/// Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
/// Allow matching [`AdvancedSubscribers`](crate::AdvancedSubscriber) to detect lost samples and optionally ask for retransimission.
///
/// Retransmission can only be achieved if history is enabled.
/// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is enabled.
#[zenoh_macros::unstable]
pub fn sample_miss_detection(mut self) -> Self {
self.sequencing = Sequencing::SequenceNumber;
self
}

/// Change the history size for each resource.
/// Attach a cache to this [`AdvancedPublisher`].
///
/// The cache can be used for history and/or recovery.
#[zenoh_macros::unstable]
pub fn cache(mut self, config: CacheConfig) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would like having some config: impl Into<CacheConfig>, together with impl From<usize> for CacheConfig, but I assume that's not for the first version, is it?

self.cache = true;
Expand All @@ -134,9 +136,9 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
self
}

/// Allow this publisher to be detected by subscribers.
/// Allow this [`AdvancedPublisher`] to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber).
///
/// This allows Subscribers to retrieve the local history.
/// This allows [`AdvancedSubscribers`](crate::AdvancedSubscriber) to retrieve the local history.
#[zenoh_macros::unstable]
pub fn publisher_detection(mut self) -> Self {
self.liveliness = true;
Expand Down
34 changes: 27 additions & 7 deletions zenoh-ext/src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ pub struct HistoryConfig {
impl HistoryConfig {
/// Enable detection of late joiner publishers and query for their historical data.
///
/// Let joiner detection can only be achieved for Publishers that enable publisher_detection.
/// History can only be retransmitted by Publishers that enable caching.
/// Let joiner detection can only be achieved for [`AdvancedPublishers`](crate::AdvancedPublisher) that enable publisher_detection.
OlivierHecart marked this conversation as resolved.
Show resolved Hide resolved
/// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
#[inline]
#[zenoh_macros::unstable]
pub fn detect_late_publishers(mut self) -> Self {
Expand Down Expand Up @@ -107,7 +107,9 @@ impl RecoveryConfig {
/// This allows to retrieve the last Sample(s) if the last Sample(s) is/are lost.
/// So it is useful for sporadic publications but useless for periodic publications
/// with a period smaller or equal to this period.
/// Retransmission can only be achieved by Publishers that also activate retransmission.
/// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
/// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
/// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
#[zenoh_macros::unstable]
#[inline]
pub fn periodic_queries(mut self, period: Option<Duration>) -> Self {
Expand Down Expand Up @@ -227,8 +229,9 @@ impl<'a, 'c, Handler> AdvancedSubscriberBuilder<'a, '_, 'c, Handler> {

/// Ask for retransmission of detected lost Samples.
///
/// Retransmission can only be achieved by Publishers that enable
/// caching and sample_miss_detection.
/// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
/// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
/// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
#[zenoh_macros::unstable]
#[inline]
pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
Expand All @@ -254,7 +257,7 @@ impl<'a, 'c, Handler> AdvancedSubscriberBuilder<'a, '_, 'c, Handler> {

/// Enable query for historical data.
///
/// History can only be retransmitted by Publishers that enable caching.
/// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
#[zenoh_macros::unstable]
#[inline]
pub fn history(mut self, config: HistoryConfig) -> Self {
Expand Down Expand Up @@ -953,6 +956,10 @@ impl<Handler> AdvancedSubscriber<Handler> {
&mut self.receiver
}

/// Declares a listener to detect missed samples.
///
/// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
/// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
#[zenoh_macros::unstable]
pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
SampleMissListenerBuilder {
Expand All @@ -961,6 +968,10 @@ impl<Handler> AdvancedSubscriber<Handler> {
}
}

/// Declares a listener to detect matching publishers.
///
/// Only [`AdvancedPublisher`](crate::AdvancedPublisher) that enable
/// [`publisher_detection`](crate::AdvancedPublisherBuilder::publisher_detection) can be detected.
#[zenoh_macros::unstable]
pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
self.subscriber.session().liveliness().declare_subscriber(
Expand Down Expand Up @@ -1109,21 +1120,29 @@ impl Drop for TimestampedRepliesHandler {
}
}

/// A struct that represent missed samples.
#[zenoh_macros::unstable]
pub struct Miss {
source: EntityGlobalId,
nb: u32,
}

impl Miss {
/// The source of missed samples.
pub fn source(&self) -> EntityGlobalId {
self.source
}

/// The number of missed samples.
pub fn nb(&self) -> u32 {
self.nb
}
}

/// A listener to detect missed samples.
///
/// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
/// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
#[zenoh_macros::unstable]
pub struct SampleMissListener<Handler> {
id: usize,
Expand Down Expand Up @@ -1182,6 +1201,7 @@ impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
}
}

/// A [`Resolvable`] returned when undeclaring a [`SampleMissListener`].
#[zenoh_macros::unstable]
pub struct SampleMissHandlerUndeclaration<Handler>(SampleMissListener<Handler>);

Expand All @@ -1207,7 +1227,7 @@ impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
}
}

/// A builder for initializing a [`SampleMissHandler`].
/// A builder for initializing a [`SampleMissListener`].
#[zenoh_macros::unstable]
pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
statesref: &'a Arc<Mutex<State>>,
Expand Down
28 changes: 12 additions & 16 deletions zenoh-ext/src/publisher_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@ use crate::{advanced_cache::CacheConfig, AdvancedPublisherBuilder};
/// Some extensions to the [`zenoh::publication::PublisherBuilder`](zenoh::publication::PublisherBuilder)
#[zenoh_macros::unstable]
pub trait AdvancedPublisherBuilderExt<'a, 'b, 'c> {
/// Allow matching Subscribers to detect lost samples and
/// optionally ask for retransimission.
///
/// Retransmission can only be achieved if history is enabled.
/// Allow matching [`AdvancedSubscribers`](crate::AdvancedSubscriber) to recover history and/or missed samples.
#[zenoh_macros::unstable]
fn cache(self, config: CacheConfig) -> AdvancedPublisherBuilder<'a, 'b, 'c>;

/// Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
/// Allow matching [`AdvancedSubscribers`](crate::AdvancedSubscriber) to detect lost samples
/// and optionally ask for retransimission.
///
/// Retransmission can only be achieved if cache is enabled.
/// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is also enabled.
#[zenoh_macros::unstable]
fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c>;

/// Allow this publisher to be detected by subscribers.
/// Allow this publisher to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber).
///
/// This allows Subscribers to retrieve the local history.
/// This allows [`AdvancedSubscribers`](crate::AdvancedSubscriber) to retrieve the local history.
#[zenoh_macros::unstable]
fn publisher_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c>;

Expand All @@ -44,26 +42,24 @@ pub trait AdvancedPublisherBuilderExt<'a, 'b, 'c> {

#[zenoh_macros::unstable]
impl<'a, 'b, 'c> AdvancedPublisherBuilderExt<'a, 'b, 'c> for PublisherBuilder<'a, 'b> {
/// Allow matching Subscribers to detect lost samples and
/// optionally ask for retransimission.
///
/// Retransmission can only be achieved if history is enabled.
/// Allow matching [`AdvancedSubscribers`](crate::AdvancedSubscriber) to recover history and/or missed samples.
#[zenoh_macros::unstable]
fn cache(self, config: CacheConfig) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
AdvancedPublisherBuilder::new(self).cache(config)
}

/// Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
/// Allow matching [`AdvancedSubscribers`](crate::AdvancedSubscriber) to detect lost samples
/// and optionally ask for retransimission.
///
/// Retransmission can only be achieved if cache is enabled.
/// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is also enabled.
#[zenoh_macros::unstable]
fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
AdvancedPublisherBuilder::new(self).sample_miss_detection()
}

/// Allow this publisher to be detected by subscribers.
/// Allow this publisher to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber).
///
/// This allows Subscribers to retrieve the local history.
/// This allows [`AdvancedSubscribers`](crate::AdvancedSubscriber) to retrieve the local history.
#[zenoh_macros::unstable]
fn publisher_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
AdvancedPublisherBuilder::new(self).publisher_detection()
Expand Down
14 changes: 8 additions & 6 deletions zenoh-ext/src/subscriber_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,15 @@ pub trait SubscriberBuilderExt<'a, 'b, Handler> {
pub trait AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> {
/// Enable query for historical data.
///
/// History can only be retransmitted by Publishers that enable caching.
/// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
#[zenoh_macros::unstable]
fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;

/// Ask for retransmission of detected lost Samples.
///
/// Retransmission can only be achieved by Publishers that enable
/// caching and sample_miss_detection.
/// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
/// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
/// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
#[zenoh_macros::unstable]
fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;

Expand Down Expand Up @@ -271,16 +272,17 @@ impl<'a, 'b, 'c, Handler> AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler>
{
/// Enable query for historical data.
///
/// History can only be retransmitted by Publishers that enable caching.
/// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
#[zenoh_macros::unstable]
fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
AdvancedSubscriberBuilder::new(self).history(config)
}

/// Ask for retransmission of detected lost Samples.
///
/// Retransmission can only be achieved by Publishers that enable
/// caching and sample_miss_detection.
/// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
/// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
/// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
#[zenoh_macros::unstable]
fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
AdvancedSubscriberBuilder::new(self).recovery(conf)
Expand Down
Loading