Skip to content

Commit

Permalink
feat: add background to advanced subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Dec 10, 2024
1 parent b807ee8 commit c3a2608
Showing 1 changed file with 66 additions and 16 deletions.
82 changes: 66 additions & 16 deletions zenoh-ext/src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
{
AdvancedSubscriberBuilder {
session: self.session,
key_expr: self.key_expr.map(|s| s.into_owned()),
key_expr: self.key_expr,
origin: self.origin,
retransmission: self.retransmission,
query_target: self.query_target,
Expand All @@ -203,7 +203,30 @@ impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
}

#[zenoh_macros::unstable]
impl<'a, 'c, Handler> AdvancedSubscriberBuilder<'a, '_, 'c, Handler> {
impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
/// Register the subscriber callback to be run in background until the session is closed.
///
/// Background builder doesn't return a `AdvancedSubscriber` object anymore.
pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
AdvancedSubscriberBuilder {
session: self.session,
key_expr: self.key_expr,
origin: self.origin,
retransmission: self.retransmission,
query_target: self.query_target,
query_timeout: self.query_timeout,
history: self.history,
liveliness: self.liveliness,
meta_key_expr: self.meta_key_expr,
handler: self.handler,
}
}
}

#[zenoh_macros::unstable]
impl<'a, 'c, Handler, const BACKGROUND: bool>
AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
{
/// Restrict the matching publications that will be receive by this [`Subscriber`]
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
Expand Down Expand Up @@ -323,6 +346,35 @@ where
}
}

#[zenoh_macros::unstable]
impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
type To = ZResult<()>;
}

#[zenoh_macros::unstable]
impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
sub.subscriber.set_background(true);
if let Some(liveliness_sub) = sub.liveliness_subscriber.take() {
liveliness_sub.set_background(true);
}
Ok(())
}
}

#[zenoh_macros::unstable]
impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}

#[zenoh_macros::unstable]
struct Period {
timer: Timer,
Expand All @@ -343,6 +395,7 @@ struct State {
query_timeout: Duration,
callback: Callback<Sample>,
miss_handlers: HashMap<usize, Callback<Miss>>,
token: Option<LivelinessToken>,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -387,8 +440,7 @@ pub struct AdvancedSubscriber<Receiver> {
statesref: Arc<Mutex<State>>,
subscriber: Subscriber<()>,
receiver: Receiver,
_liveliness_subscriber: Option<Subscriber<()>>,
_token: Option<LivelinessToken>,
liveliness_subscriber: Option<Subscriber<()>>,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -573,6 +625,7 @@ impl<Handler> AdvancedSubscriber<Handler> {
query_timeout: conf.query_timeout,
callback: callback.clone(),
miss_handlers: HashMap::new(),
token: None,
}));

let sub_callback = {
Expand Down Expand Up @@ -868,7 +921,7 @@ impl<Handler> AdvancedSubscriber<Handler> {
None
};

let token = if conf.liveliness {
if conf.liveliness {
let prefix = KE_ADV_PREFIX
/ KE_SUB
/ &subscriber.id().zid().into_keyexpr()
Expand All @@ -878,22 +931,19 @@ impl<Handler> AdvancedSubscriber<Handler> {
// We need this empty chunk because af a routing matching bug
_ => prefix / KE_EMPTY / KE_AT,
};
Some(
conf.session
.liveliness()
.declare_token(prefix / &key_expr)
.wait()?,
)
} else {
None
};
let token = conf
.session
.liveliness()
.declare_token(prefix / &key_expr)
.wait()?;
zlock!(statesref).token = Some(token)
}

let reliable_subscriber = AdvancedSubscriber {
statesref,
subscriber,
receiver,
_liveliness_subscriber: liveliness_subscriber,
_token: token,
liveliness_subscriber,
};

Ok(reliable_subscriber)
Expand Down

0 comments on commit c3a2608

Please sign in to comment.