Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Querier #1591

Merged
merged 33 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
09aef56
add querier
DenisBiryukov91 Nov 13, 2024
9a8e894
add LivelinessQuerier
DenisBiryukov91 Nov 13, 2024
800993b
code clean up
DenisBiryukov91 Nov 14, 2024
07795fa
interest support
DenisBiryukov91 Nov 18, 2024
f219640
make keyexpr include/intersect checking functions generic
DenisBiryukov91 Nov 19, 2024
3e87c79
remove liveliness querier
DenisBiryukov91 Nov 19, 2024
abf5d5d
add matching status for querier
DenisBiryukov91 Nov 20, 2024
92be499
add matching listener support
DenisBiryukov91 Nov 21, 2024
42495b7
clippy fix
DenisBiryukov91 Nov 21, 2024
0dd0b1c
clippy fix
DenisBiryukov91 Nov 21, 2024
352a0fb
clippy fix
DenisBiryukov91 Nov 21, 2024
5a3a7c2
clippy and fmt fix
DenisBiryukov91 Nov 21, 2024
70b855b
doc test fix
DenisBiryukov91 Nov 21, 2024
be22f2c
docs fix
DenisBiryukov91 Nov 21, 2024
39aa04a
fix MatchingStatus/Listener to work on session-local entities with or…
DenisBiryukov91 Nov 21, 2024
93685fe
Merge branch 'main' into querier
DenisBiryukov91 Nov 21, 2024
0d8a3e0
Merge branch 'main' into querier
DenisBiryukov91 Nov 21, 2024
f149366
clippy fix
DenisBiryukov91 Nov 21, 2024
7f380e7
fix review comments
DenisBiryukov91 Nov 22, 2024
eced4f3
explain #[allow(unused_mut)]
DenisBiryukov91 Nov 22, 2024
65e6e13
explain behaviour of keyexpr_intersect and keyexpr_include in case of…
DenisBiryukov91 Nov 22, 2024
c54ed52
log error when keyexpr_intersect/includes fails keyexpr conversion
DenisBiryukov91 Nov 25, 2024
4a0f968
add matching listener to z_pub example;
DenisBiryukov91 Nov 25, 2024
280de04
add test for querier
DenisBiryukov91 Nov 26, 2024
4fb9091
add test for matching listener/status
DenisBiryukov91 Nov 27, 2024
3c2281b
simplify MatchingListenerBuilder::with<Handler>
DenisBiryukov91 Nov 28, 2024
2699f9f
remove aggregated queriers
DenisBiryukov91 Nov 28, 2024
85f27ad
moved all MatchingStatus/Listener functionality under separate module
DenisBiryukov91 Nov 28, 2024
1d680d9
fixed z_querier example to accept selector instead of keyexpr
DenisBiryukov91 Nov 28, 2024
77dde47
merge main
DenisBiryukov91 Nov 29, 2024
b14c479
new clippy fixes
DenisBiryukov91 Nov 29, 2024
2882f8f
merge
DenisBiryukov91 Dec 2, 2024
d2dd912
mark querier related features as unstable
DenisBiryukov91 Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ validated_struct::validator! {
subscribers: Vec<OwnedKeyExpr>,
/// A list of key-expressions for which all included publishers will be aggregated into.
publishers: Vec<OwnedKeyExpr>,
/// A list of key-expressions for which all included queriers will be aggregated into.
queriers: Vec<OwnedKeyExpr>,
DenisBiryukov91 marked this conversation as resolved.
Show resolved Hide resolved
},
pub transport: #[derive(Default)]
TransportConf {
Expand Down
142 changes: 142 additions & 0 deletions examples/examples/z_querier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::Duration;

use clap::Parser;
use zenoh::{key_expr::KeyExpr, query::QueryTarget, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
async fn main() {
// initiate logging
zenoh::init_log_from_env_or("error");

let (config, keyexpr, payload, target, timeout) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring Querier on '{keyexpr}'...");
let querier = session
.declare_querier(keyexpr)
.target(target)
.timeout(timeout)
.await
.unwrap();

#[cfg(feature = "unstable")]
DenisBiryukov91 marked this conversation as resolved.
Show resolved Hide resolved
querier
.matching_listener()
.callback(|matching_status| {
OlivierHecart marked this conversation as resolved.
Show resolved Hide resolved
if matching_status.matching() {
println!("Querier has matching queryables.");
} else {
println!("Querier has NO MORE matching queryables.");
}
})
.background()
.await
.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {}", payload.clone().unwrap_or_default());
println!(
"Querying '{}' with payload: '{}')...",
&querier.key_expr(),
buf
);
let replies = querier
.get()
// // By default get receives replies from a FIFO.
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
// .with(zenoh::handlers::RingChannel::default())
// Refer to z_bytes.rs to see how to serialize different types of message
.payload(buf)
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.result() {
Ok(sample) => {
// Refer to z_bytes.rs to see how to deserialize different types of message
let payload = sample
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(
">> Received ('{}': '{}')",
sample.key_expr().as_str(),
payload,
);
}
Err(err) => {
let payload = err
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(">> Received (ERROR: '{}')", payload);
}
}
}
}
}

#[derive(clap::ValueEnum, Clone, Copy, Debug)]
#[value(rename_all = "SCREAMING_SNAKE_CASE")]
enum Qt {
BestMatching,
All,
AllComplete,
}

#[derive(Parser, Clone, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The selection of resources to query
key_expr: KeyExpr<'static>,
#[arg(short, long)]
/// An optional payload to put in the query.
payload: Option<String>,
#[arg(short, long, default_value = "BEST_MATCHING")]
/// The target queryables of the query.
target: Qt,
#[arg(short = 'o', long, default_value = "10000")]
/// The query timeout in milliseconds.
timeout: u64,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (
Config,
KeyExpr<'static>,
Option<String>,
QueryTarget,
Duration,
) {
let args = Args::parse();
(
args.common.into(),
args.key_expr,
args.payload,
match args.target {
Qt::BestMatching => QueryTarget::BestMatching,
Qt::All => QueryTarget::All,
Qt::AllComplete => QueryTarget::AllComplete,
},
Duration::from_millis(args.timeout),
)
}
4 changes: 1 addition & 3 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ lazy_static::lazy_static!(

pub(crate) fn init(session: WeakSession) {
if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) {
let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR)
.to_wire(&session)
.to_owned();
let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR);

let _admin_qabl = session.declare_queryable_inner(
&admin_key,
Expand Down
98 changes: 62 additions & 36 deletions zenoh/src/api/builders/matching_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,31 @@ use zenoh_result::ZResult;
use {
crate::api::{
handlers::{Callback, DefaultHandler, IntoHandler},
publisher::{MatchingListener, MatchingListenerInner, MatchingStatus, Publisher},
publisher::{MatchingListener, MatchingListenerInner, MatchingStatus, MatchingStatusType},
Id,
},
crate::sample::Locality,
std::sync::Arc,
std::{collections::HashSet, sync::Mutex},
};

#[cfg(feature = "unstable")]
use crate::{api::session::WeakSession, key_expr::KeyExpr};

/// A builder for initializing a [`MatchingListener`].
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct MatchingListenerBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> {
pub(crate) publisher: &'a Publisher<'b>,
pub struct MatchingListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
pub(crate) session: &'a WeakSession,
pub(crate) key_expr: &'a KeyExpr<'a>,
pub(crate) destination: Locality,
pub(crate) matching_listeners: &'a Arc<Mutex<HashSet<Id>>>,
pub(crate) matching_status_type: MatchingStatusType,
pub handler: Handler,
}

#[zenoh_macros::unstable]
impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> {
impl<'a> MatchingListenerBuilder<'a, DefaultHandler> {
/// Receive the MatchingStatuses for this listener with a callback.
///
/// # Examples
Expand All @@ -49,7 +59,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> {
/// let matching_listener = publisher
/// .matching_listener()
/// .callback(|matching_status| {
/// if matching_status.matching_subscribers() {
/// if matching_status.matching() {
/// println!("Publisher has matching subscribers.");
/// } else {
/// println!("Publisher has NO MORE matching subscribers.");
Expand All @@ -61,10 +71,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> {
/// ```
#[inline]
#[zenoh_macros::unstable]
pub fn callback<F>(
self,
callback: F,
) -> MatchingListenerBuilder<'a, 'b, Callback<MatchingStatus>>
pub fn callback<F>(self, callback: F) -> MatchingListenerBuilder<'a, Callback<MatchingStatus>>
where
F: Fn(MatchingStatus) + Send + Sync + 'static,
{
Expand Down Expand Up @@ -93,7 +100,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> {
pub fn callback_mut<F>(
self,
callback: F,
) -> MatchingListenerBuilder<'a, 'b, Callback<MatchingStatus>>
) -> MatchingListenerBuilder<'a, Callback<MatchingStatus>>
where
F: FnMut(MatchingStatus) + Send + Sync + 'static,
{
Expand All @@ -115,7 +122,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> {
/// .await
/// .unwrap();
/// while let Ok(matching_status) = matching_listener.recv_async().await {
/// if matching_status.matching_subscribers() {
/// if matching_status.matching() {
/// println!("Publisher has matching subscribers.");
/// } else {
/// println!("Publisher has NO MORE matching subscribers.");
Expand All @@ -125,20 +132,31 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> {
/// ```
#[inline]
#[zenoh_macros::unstable]
pub fn with<Handler>(self, handler: Handler) -> MatchingListenerBuilder<'a, 'b, Handler>
pub fn with<Handler>(self, handler: Handler) -> MatchingListenerBuilder<'a, Handler>
where
Handler: IntoHandler<MatchingStatus>,
{
let MatchingListenerBuilder {
DenisBiryukov91 marked this conversation as resolved.
Show resolved Hide resolved
publisher,
session,
key_expr,
destination,
matching_listeners,
matching_status_type,
handler: _,
} = self;
MatchingListenerBuilder { publisher, handler }
MatchingListenerBuilder {
session,
key_expr,
destination,
matching_listeners,
matching_status_type,
handler,
}
}
}

#[zenoh_macros::unstable]
impl<'a, 'b> MatchingListenerBuilder<'a, 'b, Callback<MatchingStatus>> {
impl<'a> MatchingListenerBuilder<'a, Callback<MatchingStatus>> {
/// Register the listener callback to be run in background until the publisher is undeclared.
///
/// Background builder doesn't return a `MatchingListener` object anymore.
Expand All @@ -154,7 +172,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, Callback<MatchingStatus>> {
/// publisher
/// .matching_listener()
/// .callback(|matching_status| {
/// if matching_status.matching_subscribers() {
/// if matching_status.matching() {
/// println!("Publisher has matching subscribers.");
/// } else {
/// println!("Publisher has NO MORE matching subscribers.");
Expand All @@ -165,16 +183,20 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, Callback<MatchingStatus>> {
/// .unwrap();
/// # }
/// ```
pub fn background(self) -> MatchingListenerBuilder<'a, 'b, Callback<MatchingStatus>, true> {
pub fn background(self) -> MatchingListenerBuilder<'a, Callback<MatchingStatus>, true> {
MatchingListenerBuilder {
publisher: self.publisher,
session: self.session,
destination: self.destination,
matching_listeners: self.matching_listeners,
key_expr: self.key_expr,
matching_status_type: self.matching_status_type,
handler: self.handler,
}
}
}

#[zenoh_macros::unstable]
impl<Handler> Resolvable for MatchingListenerBuilder<'_, '_, Handler>
impl<Handler> Resolvable for MatchingListenerBuilder<'_, Handler>
where
Handler: IntoHandler<MatchingStatus> + Send,
Handler::Handler: Send,
Expand All @@ -183,23 +205,25 @@ where
}

#[zenoh_macros::unstable]
impl<Handler> Wait for MatchingListenerBuilder<'_, '_, Handler>
impl<Handler> Wait for MatchingListenerBuilder<'_, Handler>
where
Handler: IntoHandler<MatchingStatus> + Send,
Handler::Handler: Send,
{
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
let (callback, handler) = self.handler.into_handler();
let state = self
.publisher
.session
.declare_matches_listener_inner(self.publisher, callback)?;
zlock!(self.publisher.matching_listeners).insert(state.id);
let state = self.session.declare_matches_listener_inner(
self.key_expr,
self.destination,
self.matching_status_type,
callback,
)?;
zlock!(self.matching_listeners).insert(state.id);
Ok(MatchingListener {
inner: MatchingListenerInner {
session: self.publisher.session.clone(),
matching_listeners: self.publisher.matching_listeners.clone(),
session: self.session.clone(),
matching_listeners: self.matching_listeners.clone(),
id: state.id,
undeclare_on_drop: true,
},
Expand All @@ -209,7 +233,7 @@ where
}

#[zenoh_macros::unstable]
impl<Handler> IntoFuture for MatchingListenerBuilder<'_, '_, Handler>
impl<Handler> IntoFuture for MatchingListenerBuilder<'_, Handler>
where
Handler: IntoHandler<MatchingStatus> + Send,
Handler::Handler: Send,
Expand All @@ -224,25 +248,27 @@ where
}

#[zenoh_macros::unstable]
impl Resolvable for MatchingListenerBuilder<'_, '_, Callback<MatchingStatus>, true> {
impl Resolvable for MatchingListenerBuilder<'_, Callback<MatchingStatus>, true> {
type To = ZResult<()>;
}

#[zenoh_macros::unstable]
impl Wait for MatchingListenerBuilder<'_, '_, Callback<MatchingStatus>, true> {
impl Wait for MatchingListenerBuilder<'_, Callback<MatchingStatus>, true> {
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
let state = self
.publisher
.session
.declare_matches_listener_inner(self.publisher, self.handler)?;
zlock!(self.publisher.matching_listeners).insert(state.id);
let state = self.session.declare_matches_listener_inner(
self.key_expr,
self.destination,
self.matching_status_type,
self.handler,
)?;
zlock!(self.matching_listeners).insert(state.id);
Ok(())
}
}

#[zenoh_macros::unstable]
impl IntoFuture for MatchingListenerBuilder<'_, '_, Callback<MatchingStatus>, true> {
impl IntoFuture for MatchingListenerBuilder<'_, Callback<MatchingStatus>, true> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

Expand Down
1 change: 1 addition & 0 deletions zenoh/src/api/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub(crate) mod info;
pub(crate) mod matching_listener;
pub(crate) mod publisher;
pub(crate) mod querier;
pub(crate) mod query;
pub(crate) mod queryable;
pub(crate) mod reply;
Expand Down
Loading