Skip to content

Commit

Permalink
Add LivlinessSubscriber history option (#1355)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart authored Sep 9, 2024
1 parent 7387e28 commit e6994ff
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
12 changes: 8 additions & 4 deletions examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() {
// Initiate logging
zenoh::try_init_log_from_env();

let (config, key_expr) = parse_args();
let (config, key_expr, history) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();
Expand All @@ -30,6 +30,7 @@ async fn main() {
let subscriber = session
.liveliness()
.declare_subscriber(&key_expr)
.history(history)
.await
.unwrap();

Expand All @@ -51,13 +52,16 @@ async fn main() {
#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "group1/**")]
/// The key expression to write to.
/// The key expression to subscribe to.
key: KeyExpr<'static>,
#[arg(long)]
/// Get historical liveliness tokens.
history: bool,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>) {
fn parse_args() -> (Config, KeyExpr<'static>, bool) {
let args = Args::parse();
(args.common.into(), args.key)
(args.common.into(), args.key, args.history)
}
33 changes: 32 additions & 1 deletion zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl<'a> Liveliness<'a> {
session: self.session.clone(),
key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into),
handler: DefaultHandler::default(),
history: false,
}
}

Expand Down Expand Up @@ -439,6 +440,7 @@ pub struct LivelinessSubscriberBuilder<'a, 'b, Handler> {
pub session: SessionRef<'a>,
pub key_expr: ZResult<KeyExpr<'b>>,
pub handler: Handler,
pub history: bool,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -473,11 +475,13 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
session,
key_expr,
handler: _,
history,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
handler: callback,
history,
}
}

Expand Down Expand Up @@ -544,11 +548,33 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
session,
key_expr,
handler: _,
history,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
handler,
history,
}
}
}

#[zenoh_macros::unstable]
impl<Handler> LivelinessSubscriberBuilder<'_, '_, Handler> {
#[inline]
#[zenoh_macros::unstable]
pub fn history(self, history: bool) -> Self {
let LivelinessSubscriberBuilder {
session,
key_expr,
handler,
history: _,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
handler,
history,
}
}
}
Expand Down Expand Up @@ -576,7 +602,12 @@ where
let session = self.session;
let (callback, handler) = self.handler.into_handler();
session
.declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback)
.declare_liveliness_subscriber_inner(
&key_expr,
Locality::default(),
self.history,
callback,
)
.map(|sub_state| Subscriber {
subscriber: SubscriberInner {
session,
Expand Down
7 changes: 6 additions & 1 deletion zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,7 @@ impl Session {
&self,
key_expr: &KeyExpr,
origin: Locality,
history: bool,
callback: Callback<'static, Sample>,
) -> ZResult<Arc<SubscriberState>> {
let mut state = zwrite!(self.state);
Expand Down Expand Up @@ -1475,7 +1476,11 @@ impl Session {

primitives.send_interest(Interest {
id,
mode: InterestMode::Future,
mode: if history {
InterestMode::CurrentFuture
} else {
InterestMode::Future
},
options: InterestOptions::KEYEXPRS + InterestOptions::TOKENS,
wire_expr: Some(key_expr.to_wire(self).to_owned()),
ext_qos: declare::ext::QoSType::DECLARE,
Expand Down

0 comments on commit e6994ff

Please sign in to comment.