From 549bc7b2f8a68d86441dd0e22fde4f722d130516 Mon Sep 17 00:00:00 2001 From: DenisBiryukov91 <155981813+DenisBiryukov91@users.noreply.github.com> Date: Mon, 2 Dec 2024 20:54:28 +0100 Subject: [PATCH] add Querier (#1591) * add querier * add LivelinessQuerier * code clean up * interest support * make keyexpr include/intersect checking functions generic * remove liveliness querier * add matching status for querier * add matching listener support * clippy fix * clippy fix * clippy fix * clippy and fmt fix * doc test fix * docs fix * fix MatchingStatus/Listener to work on session-local entities with origin=Locality::SessionLocal * Merge branch 'main' into querier * clippy fix * fix review comments * explain #[allow(unused_mut)] * explain behaviour of keyexpr_intersect and keyexpr_include in case of conversion failure * log error when keyexpr_intersect/includes fails keyexpr conversion * add matching listener to z_pub example; add flag to enable/disable matching listener in the z_pub and z_querier examples; * add test for querier * add test for matching listener/status * simplify MatchingListenerBuilder::with * remove aggregated queriers * moved all MatchingStatus/Listener functionality under separate module * fixed z_querier example to accept selector instead of keyexpr * new clippy fixes * mark querier related features as unstable --- examples/Cargo.toml | 5 + examples/README.md | 18 + examples/examples/z_pub.rs | 38 +- examples/examples/z_querier.rs | 157 +++++ zenoh/src/api/admin.rs | 4 +- zenoh/src/api/builders/matching_listener.rs | 96 +-- zenoh/src/api/builders/mod.rs | 2 + zenoh/src/api/builders/publisher.rs | 31 +- zenoh/src/api/builders/querier.rs | 457 +++++++++++++ zenoh/src/api/builders/queryable.rs | 9 +- zenoh/src/api/key_expr.rs | 50 ++ zenoh/src/api/matching.rs | 258 +++++++ zenoh/src/api/mod.rs | 4 + zenoh/src/api/publisher.rs | 223 +----- zenoh/src/api/querier.rs | 322 +++++++++ zenoh/src/api/session.rs | 646 +++++++++++++----- zenoh/src/lib.rs | 22 +- zenoh/src/net/routing/dispatcher/queries.rs | 14 + zenoh/src/net/routing/hat/client/pubsub.rs | 48 +- zenoh/src/net/routing/hat/client/queries.rs | 126 +++- .../net/routing/hat/linkstate_peer/queries.rs | 87 +++ zenoh/src/net/routing/hat/mod.rs | 8 + zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 32 +- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 95 ++- zenoh/src/net/routing/hat/router/queries.rs | 107 +++ zenoh/tests/matching.rs | 332 ++++----- zenoh/tests/session.rs | 112 ++- 27 files changed, 2553 insertions(+), 750 deletions(-) create mode 100644 examples/examples/z_querier.rs create mode 100644 zenoh/src/api/builders/querier.rs create mode 100644 zenoh/src/api/matching.rs create mode 100644 zenoh/src/api/querier.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 687f4790c1..af948d5b27 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -93,6 +93,11 @@ required-features = ["unstable", "shared-memory"] name = "z_pull" path = "examples/z_pull.rs" +[[example]] +name = "z_querier" +path = "examples/z_querier.rs" +required-features = ["unstable"] + [[example]] name = "z_queryable" path = "examples/z_queryable.rs" diff --git a/examples/README.md b/examples/README.md index 21d1a0ab34..c0c7b4eed3 100644 --- a/examples/README.md +++ b/examples/README.md @@ -123,6 +123,24 @@ z_get -s 'demo/**' ``` +### z_querier + + Continuously sends query messages for a selector. + The queryables with a matching path or selector (for instance [z_queryable](#z_queryable) and [z_storage](#z_storage)) + will receive these queries and reply with paths/values that will be received by the querier. + + Typical usage: + + ```bash + z_querier + ``` + + or + + ```bash + z_querier -s 'demo/**' + ``` + ### z_queryable Declares a queryable function with a path. diff --git a/examples/examples/z_pub.rs b/examples/examples/z_pub.rs index 25d6eacdee..eac7236230 100644 --- a/examples/examples/z_pub.rs +++ b/examples/examples/z_pub.rs @@ -22,7 +22,10 @@ async fn main() { // Initiate logging zenoh::init_log_from_env_or("error"); - let (config, key_expr, payload, attachment) = parse_args(); + #[cfg(feature = "unstable")] + let (config, key_expr, payload, attachment, add_matching_listener) = parse_args(); + #[cfg(not(feature = "unstable"))] + let (config, key_expr, payload, attachment, _) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); @@ -30,6 +33,22 @@ async fn main() { println!("Declaring Publisher on '{key_expr}'..."); let publisher = session.declare_publisher(&key_expr).await.unwrap(); + #[cfg(feature = "unstable")] + if add_matching_listener { + publisher + .matching_listener() + .callback(|matching_status| { + if matching_status.matching() { + println!("Publisher has matching subscribers."); + } else { + println!("Publisher has NO MORE matching subscribers."); + } + }) + .background() + .await + .unwrap(); + } + println!("Press CTRL-C to quit..."); for idx in 0..u32::MAX { tokio::time::sleep(Duration::from_secs(1)).await; @@ -56,11 +75,24 @@ struct Args { #[arg(short, long)] /// The attachments to add to each put. attach: Option, + /// Enable matching listener. + #[cfg(feature = "unstable")] + #[arg(long)] + add_matching_listener: bool, #[command(flatten)] common: CommonArgs, } -fn parse_args() -> (Config, KeyExpr<'static>, String, Option) { +fn parse_args() -> (Config, KeyExpr<'static>, String, Option, bool) { let args = Args::parse(); - (args.common.into(), args.key, args.payload, args.attach) + ( + args.common.into(), + args.key, + args.payload, + args.attach, + #[cfg(feature = "unstable")] + args.add_matching_listener, + #[cfg(not(feature = "unstable"))] + false, + ) } diff --git a/examples/examples/z_querier.rs b/examples/examples/z_querier.rs new file mode 100644 index 0000000000..d19f2e81e8 --- /dev/null +++ b/examples/examples/z_querier.rs @@ -0,0 +1,157 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::time::Duration; + +use clap::Parser; +use zenoh::{ + query::{QueryTarget, Selector}, + Config, +}; +use zenoh_examples::CommonArgs; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh::init_log_from_env_or("error"); + #[cfg(feature = "unstable")] + let (config, selector, payload, target, timeout, add_matching_listener) = parse_args(); + #[cfg(not(feature = "unstable"))] + let (config, selector, payload, target, timeout, _) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Declaring Querier on '{}'...", selector.key_expr()); + let querier = session + .declare_querier(selector.key_expr()) + .target(target) + .timeout(timeout) + .await + .unwrap(); + + #[cfg(feature = "unstable")] + if add_matching_listener { + querier + .matching_listener() + .callback(|matching_status| { + if matching_status.matching() { + println!("Querier has matching queryables."); + } else { + println!("Querier has NO MORE matching queryables."); + } + }) + .background() + .await + .unwrap(); + } + + let params = selector.parameters().as_str(); + + println!("Press CTRL-C to quit..."); + for idx in 0..u32::MAX { + tokio::time::sleep(Duration::from_secs(1)).await; + let buf = format!("[{idx:4}] {}", payload.clone().unwrap_or_default()); + println!("Querying '{}' with payload: '{}'...", &selector, buf); + let replies = querier + .get() + // // By default get receives replies from a FIFO. + // // Uncomment this line to use a ring channel instead. + // // More information on the ring channel are available in the z_pull example. + // .with(zenoh::handlers::RingChannel::default()) + // Refer to z_bytes.rs to see how to serialize different types of message + .payload(buf) + .parameters(params) + .await + .unwrap(); + while let Ok(reply) = replies.recv_async().await { + match reply.result() { + Ok(sample) => { + // Refer to z_bytes.rs to see how to deserialize different types of message + let payload = sample + .payload() + .try_to_string() + .unwrap_or_else(|e| e.to_string().into()); + println!( + ">> Received ('{}': '{}')", + sample.key_expr().as_str(), + payload, + ); + } + Err(err) => { + let payload = err + .payload() + .try_to_string() + .unwrap_or_else(|e| e.to_string().into()); + println!(">> Received (ERROR: '{}')", payload); + } + } + } + } +} + +#[derive(clap::ValueEnum, Clone, Copy, Debug)] +#[value(rename_all = "SCREAMING_SNAKE_CASE")] +enum Qt { + BestMatching, + All, + AllComplete, +} + +#[derive(Parser, Clone, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/**")] + /// The selection of resources to query + selector: Selector<'static>, + #[arg(short, long)] + /// An optional payload to put in the query. + payload: Option, + #[arg(short, long, default_value = "BEST_MATCHING")] + /// The target queryables of the query. + target: Qt, + #[arg(short = 'o', long, default_value = "10000")] + /// The query timeout in milliseconds. + timeout: u64, + /// Enable matching listener. + #[cfg(feature = "unstable")] + #[arg(long)] + add_matching_listener: bool, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> ( + Config, + Selector<'static>, + Option, + QueryTarget, + Duration, + bool, +) { + let args = Args::parse(); + ( + args.common.into(), + args.selector, + args.payload, + match args.target { + Qt::BestMatching => QueryTarget::BestMatching, + Qt::All => QueryTarget::All, + Qt::AllComplete => QueryTarget::AllComplete, + }, + Duration::from_millis(args.timeout), + #[cfg(feature = "unstable")] + args.add_matching_listener, + #[cfg(not(feature = "unstable"))] + false, + ) +} diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index b0d0bed0f7..d043ec856c 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -48,9 +48,7 @@ lazy_static::lazy_static!( pub(crate) fn init(session: WeakSession) { if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) { - let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR) - .to_wire(&session) - .to_owned(); + let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR); let _admin_qabl = session.declare_queryable_inner( &admin_key, diff --git a/zenoh/src/api/builders/matching_listener.rs b/zenoh/src/api/builders/matching_listener.rs index d81ba6e4c9..f052756dfd 100644 --- a/zenoh/src/api/builders/matching_listener.rs +++ b/zenoh/src/api/builders/matching_listener.rs @@ -22,21 +22,31 @@ use zenoh_result::ZResult; use { crate::api::{ handlers::{Callback, DefaultHandler, IntoHandler}, - publisher::{MatchingListener, MatchingListenerInner, MatchingStatus, Publisher}, + matching::{MatchingListener, MatchingListenerInner, MatchingStatus, MatchingStatusType}, + Id, }, + crate::sample::Locality, std::sync::Arc, + std::{collections::HashSet, sync::Mutex}, }; +#[cfg(feature = "unstable")] +use crate::{api::session::WeakSession, key_expr::KeyExpr}; + /// A builder for initializing a [`MatchingListener`]. #[zenoh_macros::unstable] #[derive(Debug)] -pub struct MatchingListenerBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { - pub(crate) publisher: &'a Publisher<'b>, +pub struct MatchingListenerBuilder<'a, Handler, const BACKGROUND: bool = false> { + pub(crate) session: &'a WeakSession, + pub(crate) key_expr: &'a KeyExpr<'a>, + pub(crate) destination: Locality, + pub(crate) matching_listeners: &'a Arc>>, + pub(crate) matching_status_type: MatchingStatusType, pub handler: Handler, } #[zenoh_macros::unstable] -impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { +impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { /// Receive the MatchingStatuses for this listener with a callback. /// /// # Examples @@ -49,7 +59,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { /// let matching_listener = publisher /// .matching_listener() /// .callback(|matching_status| { - /// if matching_status.matching_subscribers() { + /// if matching_status.matching() { /// println!("Publisher has matching subscribers."); /// } else { /// println!("Publisher has NO MORE matching subscribers."); @@ -61,10 +71,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { /// ``` #[inline] #[zenoh_macros::unstable] - pub fn callback( - self, - callback: F, - ) -> MatchingListenerBuilder<'a, 'b, Callback> + pub fn callback(self, callback: F) -> MatchingListenerBuilder<'a, Callback> where F: Fn(MatchingStatus) + Send + Sync + 'static, { @@ -93,7 +100,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { pub fn callback_mut( self, callback: F, - ) -> MatchingListenerBuilder<'a, 'b, Callback> + ) -> MatchingListenerBuilder<'a, Callback> where F: FnMut(MatchingStatus) + Send + Sync + 'static, { @@ -115,7 +122,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { /// .await /// .unwrap(); /// while let Ok(matching_status) = matching_listener.recv_async().await { - /// if matching_status.matching_subscribers() { + /// if matching_status.matching() { /// println!("Publisher has matching subscribers."); /// } else { /// println!("Publisher has NO MORE matching subscribers."); @@ -125,20 +132,23 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { /// ``` #[inline] #[zenoh_macros::unstable] - pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, 'b, Handler> + pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, Handler> where Handler: IntoHandler, { - let MatchingListenerBuilder { - publisher, - handler: _, - } = self; - MatchingListenerBuilder { publisher, handler } + MatchingListenerBuilder { + session: self.session, + key_expr: self.key_expr, + destination: self.destination, + matching_listeners: self.matching_listeners, + matching_status_type: self.matching_status_type, + handler, + } } } #[zenoh_macros::unstable] -impl<'a, 'b> MatchingListenerBuilder<'a, 'b, Callback> { +impl<'a> MatchingListenerBuilder<'a, Callback> { /// Register the listener callback to be run in background until the publisher is undeclared. /// /// Background builder doesn't return a `MatchingListener` object anymore. @@ -154,7 +164,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, Callback> { /// publisher /// .matching_listener() /// .callback(|matching_status| { - /// if matching_status.matching_subscribers() { + /// if matching_status.matching() { /// println!("Publisher has matching subscribers."); /// } else { /// println!("Publisher has NO MORE matching subscribers."); @@ -165,16 +175,20 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, Callback> { /// .unwrap(); /// # } /// ``` - pub fn background(self) -> MatchingListenerBuilder<'a, 'b, Callback, true> { + pub fn background(self) -> MatchingListenerBuilder<'a, Callback, true> { MatchingListenerBuilder { - publisher: self.publisher, + session: self.session, + destination: self.destination, + matching_listeners: self.matching_listeners, + key_expr: self.key_expr, + matching_status_type: self.matching_status_type, handler: self.handler, } } } #[zenoh_macros::unstable] -impl Resolvable for MatchingListenerBuilder<'_, '_, Handler> +impl Resolvable for MatchingListenerBuilder<'_, Handler> where Handler: IntoHandler + Send, Handler::Handler: Send, @@ -183,7 +197,7 @@ where } #[zenoh_macros::unstable] -impl Wait for MatchingListenerBuilder<'_, '_, Handler> +impl Wait for MatchingListenerBuilder<'_, Handler> where Handler: IntoHandler + Send, Handler::Handler: Send, @@ -191,15 +205,17 @@ where #[zenoh_macros::unstable] fn wait(self) -> ::To { let (callback, handler) = self.handler.into_handler(); - let state = self - .publisher - .session - .declare_matches_listener_inner(self.publisher, callback)?; - zlock!(self.publisher.matching_listeners).insert(state.id); + let state = self.session.declare_matches_listener_inner( + self.key_expr, + self.destination, + self.matching_status_type, + callback, + )?; + zlock!(self.matching_listeners).insert(state.id); Ok(MatchingListener { inner: MatchingListenerInner { - session: self.publisher.session.clone(), - matching_listeners: self.publisher.matching_listeners.clone(), + session: self.session.clone(), + matching_listeners: self.matching_listeners.clone(), id: state.id, undeclare_on_drop: true, }, @@ -209,7 +225,7 @@ where } #[zenoh_macros::unstable] -impl IntoFuture for MatchingListenerBuilder<'_, '_, Handler> +impl IntoFuture for MatchingListenerBuilder<'_, Handler> where Handler: IntoHandler + Send, Handler::Handler: Send, @@ -224,25 +240,27 @@ where } #[zenoh_macros::unstable] -impl Resolvable for MatchingListenerBuilder<'_, '_, Callback, true> { +impl Resolvable for MatchingListenerBuilder<'_, Callback, true> { type To = ZResult<()>; } #[zenoh_macros::unstable] -impl Wait for MatchingListenerBuilder<'_, '_, Callback, true> { +impl Wait for MatchingListenerBuilder<'_, Callback, true> { #[zenoh_macros::unstable] fn wait(self) -> ::To { - let state = self - .publisher - .session - .declare_matches_listener_inner(self.publisher, self.handler)?; - zlock!(self.publisher.matching_listeners).insert(state.id); + let state = self.session.declare_matches_listener_inner( + self.key_expr, + self.destination, + self.matching_status_type, + self.handler, + )?; + zlock!(self.matching_listeners).insert(state.id); Ok(()) } } #[zenoh_macros::unstable] -impl IntoFuture for MatchingListenerBuilder<'_, '_, Callback, true> { +impl IntoFuture for MatchingListenerBuilder<'_, Callback, true> { type Output = ::To; type IntoFuture = Ready<::To>; diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index a9cfcab630..780e25366e 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -15,6 +15,8 @@ pub(crate) mod info; pub(crate) mod matching_listener; pub(crate) mod publisher; +#[cfg(feature = "unstable")] +pub(crate) mod querier; pub(crate) mod query; pub(crate) mod queryable; pub(crate) mod reply; diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 17d0aef165..a2515eb284 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -14,9 +14,9 @@ use std::future::{IntoFuture, Ready}; use zenoh_core::{Resolvable, Result as ZResult, Wait}; +use zenoh_protocol::core::CongestionControl; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; -use zenoh_protocol::{core::CongestionControl, network::Mapping}; #[cfg(feature = "unstable")] use crate::api::sample::SourceInfo; @@ -375,34 +375,7 @@ impl Wait for PublisherBuilder<'_, '_> { fn wait(self) -> ::To { let mut key_expr = self.key_expr?; if !key_expr.is_fully_optimized(&self.session.0) { - let session_id = self.session.0.id; - let expr_id = self.session.0.declare_prefix(key_expr.as_str()).wait()?; - let prefix_len = key_expr - .len() - .try_into() - .expect("How did you get a key expression with a length over 2^32!?"); - key_expr = match key_expr.0 { - crate::api::key_expr::KeyExprInner::Borrowed(key_expr) - | crate::api::key_expr::KeyExprInner::BorrowedWire { key_expr, .. } => { - KeyExpr(crate::api::key_expr::KeyExprInner::BorrowedWire { - key_expr, - expr_id, - mapping: Mapping::Sender, - prefix_len, - session_id, - }) - } - crate::api::key_expr::KeyExprInner::Owned(key_expr) - | crate::api::key_expr::KeyExprInner::Wire { key_expr, .. } => { - KeyExpr(crate::api::key_expr::KeyExprInner::Wire { - key_expr, - expr_id, - mapping: Mapping::Sender, - prefix_len, - session_id, - }) - } - } + key_expr = self.session.declare_keyexpr(key_expr).wait()?; } let id = self .session diff --git a/zenoh/src/api/builders/querier.rs b/zenoh/src/api/builders/querier.rs new file mode 100644 index 0000000000..ef1fd010da --- /dev/null +++ b/zenoh/src/api/builders/querier.rs @@ -0,0 +1,457 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + future::{IntoFuture, Ready}, + sync::Arc, + time::Duration, +}; + +use zenoh_core::{Resolvable, Wait}; +use zenoh_protocol::{ + core::{CongestionControl, Parameters}, + network::request::ext::QueryTarget, +}; +use zenoh_result::ZResult; + +use super::sample::QoSBuilderTrait; +#[cfg(feature = "unstable")] +use crate::api::query::ReplyKeyExpr; +#[cfg(feature = "unstable")] +use crate::api::sample::SourceInfo; +#[cfg(feature = "unstable")] +use crate::query::ZenohParameters; +use crate::{ + api::{ + builders::sample::{EncodingBuilderTrait, SampleBuilderTrait}, + bytes::ZBytes, + encoding::Encoding, + handlers::{locked, Callback, DefaultHandler, IntoHandler}, + querier::Querier, + sample::{Locality, QoSBuilder}, + }, + bytes::OptionZBytes, + key_expr::KeyExpr, + qos::Priority, + query::{QueryConsolidation, Reply}, + Session, +}; + +/// A builder for initializing a `querier`. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use zenoh::{query::{ConsolidationMode, QueryTarget}}; +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let querier = session.declare_querier("key/expression") +/// .target(QueryTarget::All) +/// .consolidation(ConsolidationMode::None) +/// .await +/// .unwrap(); +/// let replies = querier.get() +/// .parameters("value>1") +/// .await +/// .unwrap(); +/// while let Ok(reply) = replies.recv_async().await { +/// println!("Received {:?}", reply.result()) +/// } +/// # } +/// ``` +#[zenoh_macros::unstable] +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct QuerierBuilder<'a, 'b> { + pub(crate) session: &'a Session, + pub(crate) key_expr: ZResult>, + pub(crate) target: QueryTarget, + pub(crate) consolidation: QueryConsolidation, + pub(crate) qos: QoSBuilder, + pub(crate) destination: Locality, + pub(crate) timeout: Duration, + #[cfg(feature = "unstable")] + pub(crate) accept_replies: ReplyKeyExpr, +} + +#[zenoh_macros::internal_trait] +impl QoSBuilderTrait for QuerierBuilder<'_, '_> { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let qos = self.qos.congestion_control(congestion_control); + Self { qos, ..self } + } + + fn priority(self, priority: Priority) -> Self { + let qos = self.qos.priority(priority); + Self { qos, ..self } + } + + fn express(self, is_express: bool) -> Self { + let qos = self.qos.express(is_express); + Self { qos, ..self } + } +} + +impl QuerierBuilder<'_, '_> { + /// Change the target of the querier queries. + #[inline] + pub fn target(self, target: QueryTarget) -> Self { + Self { target, ..self } + } + + /// Change the consolidation mode of the querier queries. + #[inline] + pub fn consolidation>(self, consolidation: QC) -> Self { + Self { + consolidation: consolidation.into(), + ..self + } + } + + /// Restrict the matching queryables that will receive the queries + /// to the ones that have the given [`Locality`](Locality). + #[zenoh_macros::unstable] + #[inline] + pub fn allowed_destination(self, destination: Locality) -> Self { + Self { + destination, + ..self + } + } + + /// Set queries timeout. + #[inline] + pub fn timeout(self, timeout: Duration) -> Self { + Self { timeout, ..self } + } + + /// By default, only replies whose key expressions intersect + /// with the querier key expression will be received by calls to [`Querier::get`](crate::query::Querier::get) method. + /// + /// If allowed to through `accept_replies(ReplyKeyExpr::Any)`, queryables may also reply on key + /// expressions that don't intersect with the querier's queries. + #[zenoh_macros::unstable] + pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self { + Self { + accept_replies: accept, + ..self + } + } +} + +impl<'b> Resolvable for QuerierBuilder<'_, 'b> { + type To = ZResult>; +} + +impl Wait for QuerierBuilder<'_, '_> { + fn wait(self) -> ::To { + let mut key_expr = self.key_expr?; + if !key_expr.is_fully_optimized(&self.session.0) { + key_expr = self.session.declare_keyexpr(key_expr).wait()?; + } + let id = self + .session + .0 + .declare_querier_inner(key_expr.clone(), self.destination)?; + Ok(Querier { + session: self.session.downgrade(), + id, + key_expr, + qos: self.qos.into(), + destination: self.destination, + undeclare_on_drop: true, + target: self.target, + consolidation: self.consolidation, + timeout: self.timeout, + #[cfg(feature = "unstable")] + accept_replies: self.accept_replies, + #[cfg(feature = "unstable")] + matching_listeners: Default::default(), + }) + } +} + +impl IntoFuture for QuerierBuilder<'_, '_> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +/// A builder for initializing a `query` to be sent from the querier. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use zenoh::{query::{ConsolidationMode, QueryTarget}}; +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let querier = session.declare_querier("key/expression") +/// .target(QueryTarget::All) +/// .consolidation(ConsolidationMode::None) +/// .await +/// .unwrap(); +/// let replies = querier +/// .get() +/// .parameters("value>1") +/// .await +/// .unwrap(); +/// while let Ok(reply) = replies.recv_async().await { +/// println!("Received {:?}", reply.result()) +/// } +/// # } +/// ``` +#[zenoh_macros::unstable] +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct QuerierGetBuilder<'a, 'b, Handler> { + pub(crate) querier: &'a Querier<'a>, + pub(crate) parameters: Parameters<'b>, + pub(crate) handler: Handler, + pub(crate) value: Option<(ZBytes, Encoding)>, + pub(crate) attachment: Option, + #[cfg(feature = "unstable")] + pub(crate) source_info: SourceInfo, +} + +#[zenoh_macros::internal_trait] +impl SampleBuilderTrait for QuerierGetBuilder<'_, '_, Handler> { + #[zenoh_macros::unstable] + fn source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info, + ..self + } + } + + fn attachment>(self, attachment: T) -> Self { + let attachment: OptionZBytes = attachment.into(); + Self { + attachment: attachment.into(), + ..self + } + } +} + +#[zenoh_macros::internal_trait] +impl EncodingBuilderTrait for QuerierGetBuilder<'_, '_, Handler> { + fn encoding>(self, encoding: T) -> Self { + let mut value = self.value.unwrap_or_default(); + value.1 = encoding.into(); + Self { + value: Some(value), + ..self + } + } +} + +impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> { + /// Receive the replies for this query with a callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::{query::{ConsolidationMode, QueryTarget}}; + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression") + /// .target(QueryTarget::All) + /// .consolidation(ConsolidationMode::None) + /// .await + /// .unwrap(); + /// let _ = querier + /// .get() + /// .callback(|reply| {println!("Received {:?}", reply.result());}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[zenoh_macros::unstable] + #[inline] + pub fn callback(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback> + where + F: Fn(Reply) + Send + Sync + 'static, + { + self.with(Callback::new(Arc::new(callback))) + } + + /// Receive the replies for this query with a mutable callback. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](crate::query::QuerierGetBuilder::callback) method, we suggest you use it instead of `callback_mut`. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::{query::{ConsolidationMode, QueryTarget}}; + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression") + /// .target(QueryTarget::All) + /// .consolidation(ConsolidationMode::None) + /// .await + /// .unwrap(); + /// let mut n = 0; + /// let _ = querier + /// .get() + /// .callback_mut(move |reply| {n += 1;}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[zenoh_macros::unstable] + #[inline] + pub fn callback_mut(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback> + where + F: FnMut(Reply) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Receive the replies for this query with a [`Handler`](crate::handlers::IntoHandler). + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::{query::{ConsolidationMode, QueryTarget}}; + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression") + /// .target(QueryTarget::All) + /// .consolidation(ConsolidationMode::None) + /// .await + /// .unwrap(); + /// let replies = querier + /// .get() + /// .with(flume::bounded(32)) + /// .await + /// .unwrap(); + /// while let Ok(reply) = replies.recv_async().await { + /// println!("Received {:?}", reply.result()); + /// } + /// # } + /// ``` + #[zenoh_macros::unstable] + #[inline] + pub fn with(self, handler: Handler) -> QuerierGetBuilder<'a, 'b, Handler> + where + Handler: IntoHandler, + { + let QuerierGetBuilder { + querier, + parameters, + value, + attachment, + #[cfg(feature = "unstable")] + source_info, + handler: _, + } = self; + QuerierGetBuilder { + querier, + parameters, + value, + attachment, + #[cfg(feature = "unstable")] + source_info, + handler, + } + } +} +impl<'b, Handler> QuerierGetBuilder<'_, 'b, Handler> { + /// Set the query payload. + #[inline] + #[zenoh_macros::unstable] + pub fn payload(mut self, payload: IntoZBytes) -> Self + where + IntoZBytes: Into, + { + let mut value = self.value.unwrap_or_default(); + value.0 = payload.into(); + self.value = Some(value); + self + } + + /// Set the query selector parameters. + #[inline] + #[zenoh_macros::unstable] + pub fn parameters

(mut self, parameters: P) -> Self + where + P: Into>, + { + self.parameters = parameters.into(); + self + } +} + +impl Resolvable for QuerierGetBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type To = ZResult; +} + +impl Wait for QuerierGetBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + let (callback, receiver) = self.handler.into_handler(); + + #[allow(unused_mut)] + // mut is only needed when building with "unstable" feature, which might add extra internal parameters on top of the user-provided ones + let mut parameters = self.parameters.clone(); + #[cfg(feature = "unstable")] + if self.querier.accept_replies() == ReplyKeyExpr::Any { + parameters.set_reply_key_expr_any(); + } + self.querier + .session + .query( + &self.querier.key_expr, + ¶meters, + self.querier.target, + self.querier.consolidation, + self.querier.qos, + self.querier.destination, + self.querier.timeout, + self.value, + self.attachment, + #[cfg(feature = "unstable")] + self.source_info, + callback, + ) + .map(|_| receiver) + } +} + +impl IntoFuture for QuerierGetBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/builders/queryable.rs b/zenoh/src/api/builders/queryable.rs index 8d4befbef2..f35e19e731 100644 --- a/zenoh/src/api/builders/queryable.rs +++ b/zenoh/src/api/builders/queryable.rs @@ -211,12 +211,7 @@ where let (callback, receiver) = self.handler.into_handler(); session .0 - .declare_queryable_inner( - &self.key_expr?.to_wire(&session.0), - self.complete, - self.origin, - callback, - ) + .declare_queryable_inner(&self.key_expr?, self.complete, self.origin, callback) .map(|qable_state| Queryable { inner: QueryableInner { session: self.session.downgrade(), @@ -248,7 +243,7 @@ impl Resolvable for QueryableBuilder<'_, '_, Callback, true> { impl Wait for QueryableBuilder<'_, '_, Callback, true> { fn wait(self) -> ::To { self.session.0.declare_queryable_inner( - &self.key_expr?.to_wire(&self.session.0), + &self.key_expr?, self.complete, self.origin, self.handler, diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index e32b90b936..8d7b18a74d 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -272,6 +272,56 @@ impl<'a> KeyExpr<'a> { Ok(r.into()) } } + + /// Will return false and log a error in case of TryInto failure. + #[inline] + pub(crate) fn keyexpr_intersect<'b, L, R>(left: L, right: R) -> bool + where + L: TryInto>, + R: TryInto>, + L::Error: std::fmt::Display, + R::Error: std::fmt::Display, + { + match left.try_into() { + Ok(l) => match right.try_into() { + Ok(r) => { + return l.intersects(&r); + } + Err(e) => { + tracing::error!("{e}"); + } + }, + Err(e) => { + tracing::error!("{e}"); + } + } + false + } + + /// Will return false and log a error in case of TryInto failure. + #[inline] + pub(crate) fn keyexpr_include<'b, L, R>(left: L, right: R) -> bool + where + L: TryInto>, + R: TryInto>, + L::Error: std::fmt::Display, + R::Error: std::fmt::Display, + { + match left.try_into() { + Ok(l) => match right.try_into() { + Ok(r) => { + return l.includes(&r); + } + Err(e) => { + tracing::error!("{e}"); + } + }, + Err(e) => { + tracing::error!("{e}"); + } + } + false + } } impl FromStr for KeyExpr<'static> { diff --git a/zenoh/src/api/matching.rs b/zenoh/src/api/matching.rs new file mode 100644 index 0000000000..c1dd7556ec --- /dev/null +++ b/zenoh/src/api/matching.rs @@ -0,0 +1,258 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + collections::HashSet, + fmt, + future::{IntoFuture, Ready}, + sync::{Arc, Mutex}, +}; + +use tracing::error; +use zenoh_core::{Resolvable, Wait}; +use zenoh_result::ZResult; + +use super::{ + handlers::Callback, + key_expr::KeyExpr, + sample::Locality, + session::{UndeclarableSealed, WeakSession}, + Id, +}; + +/// A struct that indicates if there exist entities matching the key expression. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let publisher = session.declare_publisher("key/expression").await.unwrap(); +/// let matching_status = publisher.matching_status().await.unwrap(); +/// # } +/// ``` +#[zenoh_macros::unstable] +#[derive(Copy, Clone, Debug)] +pub struct MatchingStatus { + pub(crate) matching: bool, +} + +#[cfg(feature = "unstable")] +#[derive(Debug, Copy, Clone, PartialEq)] +pub(crate) enum MatchingStatusType { + Subscribers, + Queryables(bool), +} + +#[zenoh_macros::unstable] +impl MatchingStatus { + /// Return true if there exist entities matching the target (i.e either Subscribers matching Publisher's key expression or Queryables matching Querier's key expression and target). + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_subscribers: bool = publisher + /// .matching_status() + /// .await + /// .unwrap() + /// .matching(); + /// # } + /// ``` + pub fn matching(&self) -> bool { + self.matching + } +} +#[zenoh_macros::unstable] +pub(crate) struct MatchingListenerState { + pub(crate) id: Id, + pub(crate) current: Mutex, + pub(crate) key_expr: KeyExpr<'static>, + pub(crate) destination: Locality, + pub(crate) match_type: MatchingStatusType, + pub(crate) callback: Callback, +} + +#[cfg(feature = "unstable")] +impl MatchingListenerState { + pub(crate) fn is_matching(&self, key_expr: &KeyExpr, match_type: MatchingStatusType) -> bool { + match match_type { + MatchingStatusType::Subscribers => { + self.match_type == MatchingStatusType::Subscribers + && self.key_expr.intersects(key_expr) + } + MatchingStatusType::Queryables(false) => { + self.match_type == MatchingStatusType::Queryables(false) + && self.key_expr.intersects(key_expr) + } + MatchingStatusType::Queryables(true) => { + (self.match_type == MatchingStatusType::Queryables(false) + && self.key_expr.intersects(key_expr)) + || (self.match_type == MatchingStatusType::Queryables(true) + && key_expr.includes(&self.key_expr)) + } + } + } +} + +#[zenoh_macros::unstable] +impl fmt::Debug for MatchingListenerState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MatchingListener") + .field("id", &self.id) + .field("key_expr", &self.key_expr) + .field("match_type", &self.match_type) + .finish() + } +} + +#[zenoh_macros::unstable] +pub(crate) struct MatchingListenerInner { + pub(crate) session: WeakSession, + pub(crate) matching_listeners: Arc>>, + pub(crate) id: Id, + pub(crate) undeclare_on_drop: bool, +} + +/// A listener that sends notifications when the [`MatchingStatus`] of a +/// corresponding Zenoh entity changes. +/// +/// Callback matching listeners will run in background until the corresponding Zenoh entity is undeclared, +/// or until it is undeclared. +/// On the other hand, matching listener with a handler are automatically undeclared when dropped. +/// +/// # Examples +/// ```no_run +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let publisher = session.declare_publisher("key/expression").await.unwrap(); +/// let matching_listener = publisher.matching_listener().await.unwrap(); +/// while let Ok(matching_status) = matching_listener.recv_async().await { +/// if matching_status.matching() { +/// println!("Publisher has matching subscribers."); +/// } else { +/// println!("Publisher has NO MORE matching subscribers."); +/// } +/// } +/// # } +/// ``` +#[zenoh_macros::unstable] +pub struct MatchingListener { + pub(crate) inner: MatchingListenerInner, + pub(crate) handler: Handler, +} + +#[zenoh_macros::unstable] +impl MatchingListener { + /// Undeclare the [`MatchingListener`]. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_listener = publisher.matching_listener().await.unwrap(); + /// matching_listener.undeclare().await.unwrap(); + /// # } + /// ``` + #[inline] + pub fn undeclare(self) -> MatchingListenerUndeclaration + where + Handler: Send, + { + self.undeclare_inner(()) + } + + fn undeclare_impl(&mut self) -> ZResult<()> { + // set the flag first to avoid double panic if this function panic + self.inner.undeclare_on_drop = false; + zlock!(self.inner.matching_listeners).remove(&self.inner.id); + self.inner + .session + .undeclare_matches_listener_inner(self.inner.id) + } + + #[zenoh_macros::internal] + pub fn set_background(&mut self, background: bool) { + self.inner.undeclare_on_drop = !background; + } +} + +#[cfg(feature = "unstable")] +impl Drop for MatchingListener { + fn drop(&mut self) { + if self.inner.undeclare_on_drop { + if let Err(error) = self.undeclare_impl() { + error!(error); + } + } + } +} + +#[zenoh_macros::unstable] +impl UndeclarableSealed<()> for MatchingListener { + type Undeclaration = MatchingListenerUndeclaration; + + fn undeclare_inner(self, _: ()) -> Self::Undeclaration { + MatchingListenerUndeclaration(self) + } +} + +#[zenoh_macros::unstable] +impl std::ops::Deref for MatchingListener { + type Target = Handler; + + fn deref(&self) -> &Self::Target { + &self.handler + } +} +#[zenoh_macros::unstable] +impl std::ops::DerefMut for MatchingListener { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.handler + } +} + +#[zenoh_macros::unstable] +pub struct MatchingListenerUndeclaration(MatchingListener); + +#[zenoh_macros::unstable] +impl Resolvable for MatchingListenerUndeclaration { + type To = ZResult<()>; +} + +#[zenoh_macros::unstable] +impl Wait for MatchingListenerUndeclaration { + fn wait(mut self) -> ::To { + self.0.undeclare_impl() + } +} + +#[zenoh_macros::unstable] +impl IntoFuture for MatchingListenerUndeclaration { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/mod.rs b/zenoh/src/api/mod.rs index 30268a3eb9..6981e02f73 100644 --- a/zenoh/src/api/mod.rs +++ b/zenoh/src/api/mod.rs @@ -26,9 +26,13 @@ pub(crate) mod key_expr; pub(crate) mod liveliness; #[cfg(feature = "plugins")] pub(crate) mod loader; +#[cfg(feature = "unstable")] +pub(crate) mod matching; #[cfg(feature = "plugins")] pub(crate) mod plugins; pub(crate) mod publisher; +#[cfg(feature = "unstable")] +pub(crate) mod querier; pub(crate) mod query; pub(crate) mod queryable; pub(crate) mod sample; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 03191b2d8a..78d44fc794 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -29,7 +29,8 @@ use zenoh_result::{Error, ZResult}; use { crate::api::{ builders::matching_listener::MatchingListenerBuilder, - handlers::{Callback, DefaultHandler}, + handlers::DefaultHandler, + matching::{MatchingStatus, MatchingStatusType}, sample::SourceInfo, }, std::{collections::HashSet, sync::Arc, sync::Mutex}, @@ -222,7 +223,7 @@ impl<'a> Publisher<'a> { /// Return the [`MatchingStatus`] of the publisher. /// - /// [`MatchingStatus::matching_subscribers`] will return true if there exist Subscribers + /// [`MatchingStatus::matching`] will return true if there exist Subscribers /// matching the Publisher's key expression and false otherwise. /// /// # Examples @@ -236,20 +237,23 @@ impl<'a> Publisher<'a> { /// .matching_status() /// .await /// .unwrap() - /// .matching_subscribers(); + /// .matching(); /// # } /// ``` #[zenoh_macros::unstable] pub fn matching_status(&self) -> impl Resolve> + '_ { zenoh_core::ResolveFuture::new(async move { - self.session - .matching_status(self.key_expr(), self.destination) + self.session.matching_status( + self.key_expr(), + self.destination, + MatchingStatusType::Subscribers, + ) }) } - /// Return a [`MatchingListener`] for this Publisher. + /// Return a [`MatchingListener`](crate::api::matching::MatchingListener) for this Publisher. /// - /// The [`MatchingListener`] that will send a notification each time the [`MatchingStatus`] of + /// The [`MatchingListener`](crate::api::matching::MatchingListener) that will send a notification each time the [`MatchingStatus`](crate::api::matching::MatchingStatus) of /// the Publisher changes. /// /// # Examples @@ -261,7 +265,7 @@ impl<'a> Publisher<'a> { /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_listener = publisher.matching_listener().await.unwrap(); /// while let Ok(matching_status) = matching_listener.recv_async().await { - /// if matching_status.matching_subscribers() { + /// if matching_status.matching() { /// println!("Publisher has matching subscribers."); /// } else { /// println!("Publisher has NO MORE matching subscribers."); @@ -270,9 +274,13 @@ impl<'a> Publisher<'a> { /// # } /// ``` #[zenoh_macros::unstable] - pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, 'a, DefaultHandler> { + pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> { MatchingListenerBuilder { - publisher: self, + session: &self.session, + key_expr: &self.key_expr, + destination: self.destination, + matching_listeners: &self.matching_listeners, + matching_status_type: MatchingStatusType::Subscribers, handler: DefaultHandler::default(), } } @@ -500,201 +508,6 @@ impl TryFrom for Priority { } } -/// A struct that indicates if there exist Subscribers matching the Publisher's key expression. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let publisher = session.declare_publisher("key/expression").await.unwrap(); -/// let matching_status = publisher.matching_status().await.unwrap(); -/// # } -/// ``` -#[zenoh_macros::unstable] -#[derive(Copy, Clone, Debug)] -pub struct MatchingStatus { - pub(crate) matching: bool, -} - -#[zenoh_macros::unstable] -impl MatchingStatus { - /// Return true if there exist Subscribers matching the Publisher's key expression. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap(); - /// let matching_subscribers: bool = publisher - /// .matching_status() - /// .await - /// .unwrap() - /// .matching_subscribers(); - /// # } - /// ``` - pub fn matching_subscribers(&self) -> bool { - self.matching - } -} -#[zenoh_macros::unstable] -pub(crate) struct MatchingListenerState { - pub(crate) id: Id, - pub(crate) current: Mutex, - pub(crate) key_expr: KeyExpr<'static>, - pub(crate) destination: Locality, - pub(crate) callback: Callback, -} - -#[zenoh_macros::unstable] -impl fmt::Debug for MatchingListenerState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("MatchingListener") - .field("id", &self.id) - .field("key_expr", &self.key_expr) - .finish() - } -} - -#[zenoh_macros::unstable] -pub(crate) struct MatchingListenerInner { - pub(crate) session: WeakSession, - pub(crate) matching_listeners: Arc>>, - pub(crate) id: Id, - pub(crate) undeclare_on_drop: bool, -} - -/// A listener that sends notifications when the [`MatchingStatus`] of a -/// publisher changes. -/// -/// Callback matching listeners will run in background until the publisher is undeclared, -/// or until it is undeclared. -/// On the other hand, matching listener with a handler are automatically undeclared when dropped. -/// -/// # Examples -/// ```no_run -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let publisher = session.declare_publisher("key/expression").await.unwrap(); -/// let matching_listener = publisher.matching_listener().await.unwrap(); -/// while let Ok(matching_status) = matching_listener.recv_async().await { -/// if matching_status.matching_subscribers() { -/// println!("Publisher has matching subscribers."); -/// } else { -/// println!("Publisher has NO MORE matching subscribers."); -/// } -/// } -/// # } -/// ``` -#[zenoh_macros::unstable] -pub struct MatchingListener { - pub(crate) inner: MatchingListenerInner, - pub(crate) handler: Handler, -} - -#[zenoh_macros::unstable] -impl MatchingListener { - /// Undeclare the [`MatchingListener`]. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap(); - /// let matching_listener = publisher.matching_listener().await.unwrap(); - /// matching_listener.undeclare().await.unwrap(); - /// # } - /// ``` - #[inline] - pub fn undeclare(self) -> MatchingListenerUndeclaration - where - Handler: Send, - { - self.undeclare_inner(()) - } - - fn undeclare_impl(&mut self) -> ZResult<()> { - // set the flag first to avoid double panic if this function panic - self.inner.undeclare_on_drop = false; - zlock!(self.inner.matching_listeners).remove(&self.inner.id); - self.inner - .session - .undeclare_matches_listener_inner(self.inner.id) - } - - #[zenoh_macros::internal] - pub fn set_background(&mut self, background: bool) { - self.inner.undeclare_on_drop = !background; - } -} - -#[cfg(feature = "unstable")] -impl Drop for MatchingListener { - fn drop(&mut self) { - if self.inner.undeclare_on_drop { - if let Err(error) = self.undeclare_impl() { - error!(error); - } - } - } -} - -#[zenoh_macros::unstable] -impl UndeclarableSealed<()> for MatchingListener { - type Undeclaration = MatchingListenerUndeclaration; - - fn undeclare_inner(self, _: ()) -> Self::Undeclaration { - MatchingListenerUndeclaration(self) - } -} - -#[zenoh_macros::unstable] -impl std::ops::Deref for MatchingListener { - type Target = Handler; - - fn deref(&self) -> &Self::Target { - &self.handler - } -} -#[zenoh_macros::unstable] -impl std::ops::DerefMut for MatchingListener { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.handler - } -} - -#[zenoh_macros::unstable] -pub struct MatchingListenerUndeclaration(MatchingListener); - -#[zenoh_macros::unstable] -impl Resolvable for MatchingListenerUndeclaration { - type To = ZResult<()>; -} - -#[zenoh_macros::unstable] -impl Wait for MatchingListenerUndeclaration { - fn wait(mut self) -> ::To { - self.0.undeclare_impl() - } -} - -#[zenoh_macros::unstable] -impl IntoFuture for MatchingListenerUndeclaration { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - #[cfg(test)] mod tests { use crate::{sample::SampleKind, Config, Wait}; diff --git a/zenoh/src/api/querier.rs b/zenoh/src/api/querier.rs new file mode 100644 index 0000000000..312904617c --- /dev/null +++ b/zenoh/src/api/querier.rs @@ -0,0 +1,322 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use core::fmt; +use std::{ + future::{IntoFuture, Ready}, + time::Duration, +}; + +use tracing::error; +use zenoh_core::{Resolvable, Resolve, Wait}; +use zenoh_protocol::{ + core::{CongestionControl, Parameters}, + network::request::ext::QueryTarget, +}; +use zenoh_result::ZResult; +#[cfg(feature = "unstable")] +use { + crate::api::builders::matching_listener::MatchingListenerBuilder, + crate::api::matching::{MatchingStatus, MatchingStatusType}, + crate::api::sample::SourceInfo, + crate::query::ReplyKeyExpr, + std::collections::HashSet, + std::sync::{Arc, Mutex}, + zenoh_config::wrappers::EntityGlobalId, + zenoh_protocol::core::EntityGlobalIdProto, +}; + +use super::{ + builders::querier::QuerierGetBuilder, + key_expr::KeyExpr, + query::QueryConsolidation, + sample::{Locality, QoS}, + session::{UndeclarableSealed, WeakSession}, + Id, +}; +use crate::{api::handlers::DefaultHandler, qos::Priority}; + +pub(crate) struct QuerierState { + pub(crate) id: Id, + pub(crate) remote_id: Id, + pub(crate) key_expr: KeyExpr<'static>, + pub(crate) destination: Locality, +} + +/// A querier that allows to send queries to a queryable. +/// +/// Queriers are automatically undeclared when dropped. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let querier = session.declare_querier("key/expression").await.unwrap(); +/// let replies = querier.get().await.unwrap(); +/// # } +/// ``` +#[zenoh_macros::unstable] +#[derive(Debug)] +pub struct Querier<'a> { + pub(crate) session: WeakSession, + pub(crate) id: Id, + pub(crate) key_expr: KeyExpr<'a>, + pub(crate) qos: QoS, + pub(crate) destination: Locality, + pub(crate) target: QueryTarget, + pub(crate) consolidation: QueryConsolidation, + pub(crate) timeout: Duration, + #[cfg(feature = "unstable")] + pub(crate) accept_replies: ReplyKeyExpr, + pub(crate) undeclare_on_drop: bool, + #[cfg(feature = "unstable")] + pub(crate) matching_listeners: Arc>>, +} + +impl fmt::Debug for QuerierState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Querier") + .field("id", &self.id) + .field("key_expr", &self.key_expr) + .finish() + } +} + +impl<'a> Querier<'a> { + /// Returns the [`EntityGlobalId`] of this Querier. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression") + /// .await + /// .unwrap(); + /// let querier_id = querier.id(); + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn id(&self) -> EntityGlobalId { + EntityGlobalIdProto { + zid: self.session.zid().into(), + eid: self.id, + } + .into() + } + + #[inline] + #[zenoh_macros::unstable] + pub fn key_expr(&self) -> &KeyExpr<'a> { + &self.key_expr + } + + /// Get the `congestion_control` applied when routing the data. + #[inline] + #[zenoh_macros::unstable] + pub fn congestion_control(&self) -> CongestionControl { + self.qos.congestion_control() + } + + /// Get the priority of the written data. + #[inline] + #[zenoh_macros::unstable] + pub fn priority(&self) -> Priority { + self.qos.priority() + } + + /// Get type of queryables that can reply to this querier + #[inline] + #[zenoh_macros::unstable] + pub fn accept_replies(&self) -> ReplyKeyExpr { + self.accept_replies + } + + /// Send a query. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression").await.unwrap(); + /// let replies = querier.get(); + /// # } + /// ``` + #[inline] + #[zenoh_macros::unstable] + pub fn get(&self) -> QuerierGetBuilder<'_, '_, DefaultHandler> { + QuerierGetBuilder { + querier: self, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + value: None, + attachment: None, + parameters: Parameters::empty(), + handler: DefaultHandler::default(), + } + } + + /// Undeclare the [`Querier`], informing the network that it needn't optimize queries for its key expression anymore. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression").await.unwrap(); + /// querier.undeclare().await.unwrap(); + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn undeclare(self) -> impl Resolve> + 'a { + UndeclarableSealed::undeclare_inner(self, ()) + } + + fn undeclare_impl(&mut self) -> ZResult<()> { + // set the flag first to avoid double panic if this function panic + self.undeclare_on_drop = false; + #[cfg(feature = "unstable")] + { + let ids: Vec = zlock!(self.matching_listeners).drain().collect(); + for id in ids { + self.session.undeclare_matches_listener_inner(id)? + } + } + self.session.undeclare_querier_inner(self.id) + } + + /// Return the [`MatchingStatus`] of the querier. + /// + /// [`MatchingStatus::matching`] will return true if there exist Queryables + /// matching the Queriers's key expression and target and false otherwise. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression").await.unwrap(); + /// let matching_queriers: bool = querier + /// .matching_status() + /// .await + /// .unwrap() + /// .matching(); + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn matching_status(&self) -> impl Resolve> + '_ { + zenoh_core::ResolveFuture::new(async move { + self.session.matching_status( + self.key_expr(), + self.destination, + MatchingStatusType::Queryables(self.target == QueryTarget::AllComplete), + ) + }) + } + + /// Return a [`MatchingListener`](crate::api::matching::MatchingListener) for this Querier. + /// + /// The [`MatchingListener`](crate::api::matching::MatchingListener) that will send a notification each time the [`MatchingStatus`](crate::api::matching::MatchingStatus) of + /// the Querier changes. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression").await.unwrap(); + /// let matching_listener = querier.matching_listener().await.unwrap(); + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching() { + /// println!("Querier has matching queryables."); + /// } else { + /// println!("Querier has NO MORE matching queryables."); + /// } + /// } + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> { + MatchingListenerBuilder { + session: &self.session, + key_expr: &self.key_expr, + destination: self.destination, + matching_listeners: &self.matching_listeners, + matching_status_type: MatchingStatusType::Queryables( + self.target == QueryTarget::AllComplete, + ), + handler: DefaultHandler::default(), + } + } +} + +impl<'a> UndeclarableSealed<()> for Querier<'a> { + type Undeclaration = QuerierUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Undeclaration { + QuerierUndeclaration(self) + } +} + +/// A [`Resolvable`] returned when undeclaring a publisher. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let querier = session.declare_querier("key/expression").await.unwrap(); +/// querier.undeclare().await.unwrap(); +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +pub struct QuerierUndeclaration<'a>(Querier<'a>); + +impl Resolvable for QuerierUndeclaration<'_> { + type To = ZResult<()>; +} + +impl Wait for QuerierUndeclaration<'_> { + fn wait(mut self) -> ::To { + self.0.undeclare_impl() + } +} + +impl IntoFuture for QuerierUndeclaration<'_> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +impl Drop for Querier<'_> { + fn drop(&mut self) { + if self.undeclare_on_drop { + if let Err(error) = self.undeclare_impl() { + error!(error); + } + } + } +} diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 225a0d16cf..b679a13dbb 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -71,10 +71,11 @@ use zenoh_task::TaskController; use crate::api::selector::ZenohParameters; #[cfg(feature = "unstable")] use crate::api::{ + builders::querier::QuerierBuilder, liveliness::Liveliness, - publisher::Publisher, - publisher::{MatchingListenerState, MatchingStatus}, - query::LivelinessQueryState, + matching::{MatchingListenerState, MatchingStatus, MatchingStatusType}, + querier::QuerierState, + query::{LivelinessQueryState, ReplyKeyExpr}, sample::SourceInfo, }; use crate::{ @@ -131,12 +132,16 @@ pub(crate) struct SessionState { pub(crate) remote_subscribers: HashMap>, pub(crate) publishers: HashMap, #[cfg(feature = "unstable")] + pub(crate) queriers: HashMap, + #[cfg(feature = "unstable")] pub(crate) remote_tokens: HashMap>, //pub(crate) publications: Vec, pub(crate) subscribers: HashMap>, pub(crate) liveliness_subscribers: HashMap>, pub(crate) queryables: HashMap>, #[cfg(feature = "unstable")] + pub(crate) remote_queryables: HashMap, bool)>, + #[cfg(feature = "unstable")] pub(crate) matching_listeners: HashMap>, pub(crate) queries: HashMap, #[cfg(feature = "unstable")] @@ -162,12 +167,16 @@ impl SessionState { remote_subscribers: HashMap::new(), publishers: HashMap::new(), #[cfg(feature = "unstable")] + queriers: HashMap::new(), + #[cfg(feature = "unstable")] remote_tokens: HashMap::new(), //publications: Vec::new(), subscribers: HashMap::new(), liveliness_subscribers: HashMap::new(), queryables: HashMap::new(), #[cfg(feature = "unstable")] + remote_queryables: HashMap::new(), + #[cfg(feature = "unstable")] matching_listeners: HashMap::new(), queries: HashMap::new(), #[cfg(feature = "unstable")] @@ -296,6 +305,119 @@ impl SessionState { SubscriberKind::LivelinessSubscriber => &mut self.liveliness_subscribers, } } + + #[cfg(feature = "unstable")] + fn register_querier<'a>( + &mut self, + id: EntityId, + key_expr: &'a KeyExpr, + destination: Locality, + ) -> Option> { + let mut querier_state = QuerierState { + id, + remote_id: id, + key_expr: key_expr.clone().into_owned(), + destination, + }; + + let declared_querier = + (destination != Locality::SessionLocal) + .then(|| { + if let Some(twin_querier) = self.queriers.values().find(|p| { + p.destination != Locality::SessionLocal && &p.key_expr == key_expr + }) { + querier_state.remote_id = twin_querier.remote_id; + None + } else { + Some(key_expr.clone()) + } + }) + .flatten(); + self.queriers.insert(id, querier_state); + declared_querier + } + + fn register_subscriber<'a>( + &mut self, + id: EntityId, + key_expr: &'a KeyExpr, + origin: Locality, + callback: Callback, + ) -> (Arc, Option>) { + let mut sub_state = SubscriberState { + id, + remote_id: id, + key_expr: key_expr.clone().into_owned(), + origin, + callback, + }; + + let declared_sub = origin != Locality::SessionLocal; + + let declared_sub = declared_sub + .then(|| { + match self + .aggregated_subscribers + .iter() + .find(|s| s.includes(key_expr)) + { + Some(join_sub) => { + if let Some(joined_sub) = self + .subscribers(SubscriberKind::Subscriber) + .values() + .find(|s| { + s.origin != Locality::SessionLocal && join_sub.includes(&s.key_expr) + }) + { + sub_state.remote_id = joined_sub.remote_id; + None + } else { + Some(join_sub.clone().into()) + } + } + None => { + if let Some(twin_sub) = self + .subscribers(SubscriberKind::Subscriber) + .values() + .find(|s| s.origin != Locality::SessionLocal && s.key_expr == *key_expr) + { + sub_state.remote_id = twin_sub.remote_id; + None + } else { + Some(key_expr.clone()) + } + } + } + }) + .flatten(); + + let sub_state = Arc::new(sub_state); + + self.subscribers_mut(SubscriberKind::Subscriber) + .insert(sub_state.id, sub_state.clone()); + for res in self + .local_resources + .values_mut() + .filter_map(Resource::as_node_mut) + { + if key_expr.intersects(&res.key_expr) { + res.subscribers_mut(SubscriberKind::Subscriber) + .push(sub_state.clone()); + } + } + for res in self + .remote_resources + .values_mut() + .filter_map(Resource::as_node_mut) + { + if key_expr.intersects(&res.key_expr) { + res.subscribers_mut(SubscriberKind::Subscriber) + .push(sub_state.clone()); + } + } + + (sub_state, declared_sub) + } } impl fmt::Debug for SessionState { @@ -517,7 +639,7 @@ impl Drop for WeakSession { /// Error indicating the operation cannot proceed because the session is closed. /// -/// It may be returned by operations like [`Session::get`] or [`Publisher::put`] when +/// It may be returned by operations like [`Session::get`] or [`Publisher::put`](crate::api::publisher::Publisher::put) when /// [`Session::close`] has been called before. #[derive(Debug)] pub struct SessionClosedError; @@ -835,6 +957,51 @@ impl Session { } } + /// Create a [`Querier`](crate::query::Querier) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The key expression matching resources to query + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression") + /// .await + /// .unwrap(); + /// let replies = querier.get().await.unwrap(); + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn declare_querier<'b, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> QuerierBuilder<'_, 'b> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + let timeout = { + let conf = &self.0.runtime.config().lock().0; + Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())) + }; + let qos: QoS = request::ext::QoSType::REQUEST.into(); + QuerierBuilder { + session: self, + key_expr: key_expr.try_into().map_err(Into::into), + qos: qos.into(), + destination: Locality::default(), + target: QueryTarget::default(), + consolidation: QueryConsolidation::default(), + timeout, + #[cfg(feature = "unstable")] + accept_replies: ReplyKeyExpr::default(), + } + } + /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. /// /// # Examples @@ -1263,6 +1430,65 @@ impl SessionInner { } } + #[cfg(feature = "unstable")] + pub(crate) fn declare_querier_inner( + &self, + key_expr: KeyExpr, + destination: Locality, + ) -> ZResult { + tracing::trace!("declare_querier({:?})", key_expr); + let mut state = zwrite!(self.state); + let id = self.runtime.next_id(); + let declared_querier = state.register_querier(id, &key_expr, destination); + if let Some(res) = declared_querier { + let primitives = state.primitives()?; + drop(state); + primitives.send_interest(Interest { + id, + mode: InterestMode::CurrentFuture, + options: InterestOptions::KEYEXPRS + InterestOptions::QUERYABLES, + wire_expr: Some(res.to_wire(self).to_owned()), + ext_qos: network::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: network::ext::NodeIdType::DEFAULT, + }); + } + Ok(id) + } + + #[cfg(feature = "unstable")] + pub(crate) fn undeclare_querier_inner(&self, pid: Id) -> ZResult<()> { + let mut state = zwrite!(self.state); + let Ok(primitives) = state.primitives() else { + return Ok(()); + }; + if let Some(querier_state) = state.queriers.remove(&pid) { + trace!("undeclare_querier({:?})", querier_state); + if querier_state.destination != Locality::SessionLocal { + // Note: there might be several queriers on the same KeyExpr. + // Before calling forget_queriers(key_expr), check if this was the last one. + if !state.queriers.values().any(|p| { + p.destination != Locality::SessionLocal + && p.remote_id == querier_state.remote_id + }) { + drop(state); + primitives.send_interest(Interest { + id: querier_state.remote_id, + mode: InterestMode::Final, + options: InterestOptions::empty(), + wire_expr: None, + ext_qos: declare::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + }); + } + } + Ok(()) + } else { + Err(zerror!("Unable to find querier").into()) + } + } + pub(crate) fn declare_subscriber_inner( self: &Arc, key_expr: &KeyExpr, @@ -1272,80 +1498,7 @@ impl SessionInner { let mut state = zwrite!(self.state); tracing::trace!("declare_subscriber({:?})", key_expr); let id = self.runtime.next_id(); - - let mut sub_state = SubscriberState { - id, - remote_id: id, - key_expr: key_expr.clone().into_owned(), - origin, - callback, - }; - - let declared_sub = origin != Locality::SessionLocal; - - let declared_sub = declared_sub - .then(|| { - match state - .aggregated_subscribers - .iter() - .find(|s| s.includes(key_expr)) - { - Some(join_sub) => { - if let Some(joined_sub) = state - .subscribers(SubscriberKind::Subscriber) - .values() - .find(|s| { - s.origin != Locality::SessionLocal && join_sub.includes(&s.key_expr) - }) - { - sub_state.remote_id = joined_sub.remote_id; - None - } else { - Some(join_sub.clone().into()) - } - } - None => { - if let Some(twin_sub) = state - .subscribers(SubscriberKind::Subscriber) - .values() - .find(|s| s.origin != Locality::SessionLocal && s.key_expr == *key_expr) - { - sub_state.remote_id = twin_sub.remote_id; - None - } else { - Some(key_expr.clone()) - } - } - } - }) - .flatten(); - - let sub_state = Arc::new(sub_state); - - state - .subscribers_mut(SubscriberKind::Subscriber) - .insert(sub_state.id, sub_state.clone()); - for res in state - .local_resources - .values_mut() - .filter_map(Resource::as_node_mut) - { - if key_expr.intersects(&res.key_expr) { - res.subscribers_mut(SubscriberKind::Subscriber) - .push(sub_state.clone()); - } - } - for res in state - .remote_resources - .values_mut() - .filter_map(Resource::as_node_mut) - { - if key_expr.intersects(&res.key_expr) { - res.subscribers_mut(SubscriberKind::Subscriber) - .push(sub_state.clone()); - } - } - + let (sub_state, declared_sub) = state.register_subscriber(id, key_expr, origin, callback); if let Some(key_expr) = declared_sub { let primitives = state.primitives()?; drop(state); @@ -1382,12 +1535,19 @@ impl SessionInner { wire_expr: key_expr.to_wire(self).to_owned(), }), }); - #[cfg(feature = "unstable")] { let state = zread!(self.state); - self.update_status_up(&state, &key_expr) + self.update_matching_status( + &state, + &key_expr, + MatchingStatusType::Subscribers, + true, + ) } + } else if origin == Locality::SessionLocal { + #[cfg(feature = "unstable")] + self.update_matching_status(&state, key_expr, MatchingStatusType::Subscribers, true) } Ok(sub_state) @@ -1421,48 +1581,60 @@ impl SessionInner { .retain(|sub| sub.id != sub_state.id); } - if sub_state.origin != Locality::SessionLocal && kind == SubscriberKind::Subscriber { - // Note: there might be several Subscribers on the same KeyExpr. - // Before calling forget_subscriber(key_expr), check if this was the last one. - if !state.subscribers(kind).values().any(|s| { - s.origin != Locality::SessionLocal && s.remote_id == sub_state.remote_id - }) { - drop(state); - primitives.send_declare(Declare { - interest_id: None, - ext_qos: declare::ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: sub_state.remote_id, - ext_wire_expr: WireExprType { - wire_expr: WireExpr::empty(), - }, - }), - }); + match kind { + SubscriberKind::Subscriber => { + if sub_state.origin != Locality::SessionLocal { + // Note: there might be several Subscribers on the same KeyExpr. + // Before calling forget_subscriber(key_expr), check if this was the last one. + if !state.subscribers(kind).values().any(|s| { + s.origin != Locality::SessionLocal && s.remote_id == sub_state.remote_id + }) { + drop(state); + primitives.send_declare(Declare { + interest_id: None, + ext_qos: declare::ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: sub_state.remote_id, + ext_wire_expr: WireExprType { + wire_expr: WireExpr::empty(), + }, + }), + }); + } + } else { + drop(state); + } #[cfg(feature = "unstable")] { let state = zread!(self.state); - self.update_status_down(&state, &sub_state.key_expr) + self.update_matching_status( + &state, + &sub_state.key_expr, + MatchingStatusType::Subscribers, + false, + ) } } - } else { - #[cfg(feature = "unstable")] - if kind == SubscriberKind::LivelinessSubscriber { - let primitives = state.primitives()?; - drop(state); + SubscriberKind::LivelinessSubscriber => { + #[cfg(feature = "unstable")] + if kind == SubscriberKind::LivelinessSubscriber { + let primitives = state.primitives()?; + drop(state); - primitives.send_interest(Interest { - id: sub_state.id, - mode: InterestMode::Final, - // Note: InterestMode::Final options are undefined in the current protocol specification, - // they are initialized here for internal use by local egress interceptors. - options: InterestOptions::TOKENS, - wire_expr: None, - ext_qos: declare::ext::QoSType::DEFAULT, - ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::DEFAULT, - }); + primitives.send_interest(Interest { + id: sub_state.id, + mode: InterestMode::Final, + // Note: InterestMode::Final options are undefined in the current protocol specification, + // they are initialized here for internal use by local egress interceptors. + options: InterestOptions::TOKENS, + wire_expr: None, + ext_qos: declare::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + }); + } } } @@ -1473,25 +1645,25 @@ impl SessionInner { } pub(crate) fn declare_queryable_inner( - &self, - key_expr: &WireExpr, + self: &Arc, + key_expr: &KeyExpr, complete: bool, origin: Locality, callback: Callback, ) -> ZResult> { + let wire_expr = key_expr.to_wire(self); let mut state = zwrite!(self.state); tracing::trace!("declare_queryable({:?})", key_expr); let id = self.runtime.next_id(); let qable_state = Arc::new(QueryableState { id, - key_expr: key_expr.to_owned(), + key_expr: wire_expr.to_owned(), complete, origin, callback, }); state.queryables.insert(id, qable_state.clone()); - if origin != Locality::SessionLocal { let primitives = state.primitives()?; drop(state); @@ -1506,15 +1678,29 @@ impl SessionInner { ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id, - wire_expr: key_expr.to_owned(), + wire_expr: wire_expr.to_owned(), ext_info: qabl_info, }), }); + } else { + drop(state); + } + + #[cfg(feature = "unstable")] + { + let state = zread!(self.state); + self.update_matching_status( + &state, + key_expr, + MatchingStatusType::Queryables(complete), + true, + ) } + Ok(qable_state) } - pub(crate) fn close_queryable(&self, qid: Id) -> ZResult<()> { + pub(crate) fn close_queryable(self: &Arc, qid: Id) -> ZResult<()> { let mut state = zwrite!(self.state); let Ok(primitives) = state.primitives() else { return Ok(()); @@ -1535,6 +1721,18 @@ impl SessionInner { }, }), }); + } else { + drop(state); + } + #[cfg(feature = "unstable")] + { + let state = zread!(self.state); + self.update_matching_status( + &state, + &state.local_wireexpr_to_expr(&qable_state.key_expr)?, + MatchingStatusType::Queryables(qable_state.complete), + false, + ) } Ok(()) } else { @@ -1683,17 +1881,20 @@ impl SessionInner { #[zenoh_macros::unstable] pub(crate) fn declare_matches_listener_inner( &self, - publisher: &Publisher, + key_expr: &KeyExpr, + destination: Locality, + match_type: MatchingStatusType, callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); let id = self.runtime.next_id(); - tracing::trace!("matches_listener({:?}) => {id}", publisher.key_expr); + tracing::trace!("matches_listener({:?}: {:?}) => {id}", match_type, key_expr); let listener_state = Arc::new(MatchingListenerState { id, current: std::sync::Mutex::new(false), - destination: publisher.destination, - key_expr: publisher.key_expr.clone().into_owned(), + destination, + key_expr: key_expr.clone().into_owned(), + match_type, callback, }); state.matching_listeners.insert(id, listener_state.clone()); @@ -1701,8 +1902,8 @@ impl SessionInner { match listener_state.current.lock() { Ok(mut current) => { if self - .matching_status(&publisher.key_expr, listener_state.destination) - .map(|s| s.matching_subscribers()) + .matching_status(key_expr, listener_state.destination, match_type) + .map(|s| s.matching()) .unwrap_or(true) { *current = true; @@ -1717,34 +1918,68 @@ impl SessionInner { } #[zenoh_macros::unstable] - pub(crate) fn matching_status( + fn matching_status_local( + &self, + key_expr: &KeyExpr, + matching_type: MatchingStatusType, + ) -> MatchingStatus { + let state = zread!(self.state); + let matching = match matching_type { + MatchingStatusType::Subscribers => state + .subscribers(SubscriberKind::Subscriber) + .values() + .any(|s| s.key_expr.intersects(key_expr)), + MatchingStatusType::Queryables(false) => state.queryables.values().any(|q| { + state + .local_wireexpr_to_expr(&q.key_expr) + .map_or(false, |ke| ke.intersects(key_expr)) + }), + MatchingStatusType::Queryables(true) => state.queryables.values().any(|q| { + q.complete + && state + .local_wireexpr_to_expr(&q.key_expr) + .map_or(false, |ke| ke.includes(key_expr)) + }), + }; + MatchingStatus { matching } + } + + #[zenoh_macros::unstable] + fn matching_status_remote( &self, key_expr: &KeyExpr, destination: Locality, + matching_type: MatchingStatusType, ) -> ZResult { let router = self.runtime.router(); let tables = zread!(router.tables.tables); - let matching_subscriptions = - crate::net::routing::dispatcher::pubsub::get_matching_subscriptions(&tables, key_expr); + let matches = match matching_type { + MatchingStatusType::Subscribers => { + crate::net::routing::dispatcher::pubsub::get_matching_subscriptions( + &tables, key_expr, + ) + } + MatchingStatusType::Queryables(complete) => { + crate::net::routing::dispatcher::queries::get_matching_queryables( + &tables, key_expr, complete, + ) + } + }; drop(tables); let matching = match destination { - Locality::Any => !matching_subscriptions.is_empty(), + Locality::Any => !matches.is_empty(), Locality::Remote => { if let Some(face) = zread!(self.state).primitives.as_ref() { - matching_subscriptions - .values() - .any(|dir| !Arc::ptr_eq(dir, &face.state)) + matches.values().any(|dir| !Arc::ptr_eq(dir, &face.state)) } else { - !matching_subscriptions.is_empty() + !matches.is_empty() } } Locality::SessionLocal => { if let Some(face) = zread!(self.state).primitives.as_ref() { - matching_subscriptions - .values() - .any(|dir| Arc::ptr_eq(dir, &face.state)) + matches.values().any(|dir| Arc::ptr_eq(dir, &face.state)) } else { false } @@ -1754,47 +1989,36 @@ impl SessionInner { } #[zenoh_macros::unstable] - pub(crate) fn update_status_up(self: &Arc, state: &SessionState, key_expr: &KeyExpr) { - for msub in state.matching_listeners.values() { - if key_expr.intersects(&msub.key_expr) { - // Cannot hold session lock when calling tables (matching_status()) - // TODO: check which ZRuntime should be used - self.task_controller - .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { - let session = WeakSession::new(self); - let msub = msub.clone(); - async move { - match msub.current.lock() { - Ok(mut current) => { - if !*current { - if let Ok(status) = session - .matching_status(&msub.key_expr, msub.destination) - { - if status.matching_subscribers() { - *current = true; - let callback = msub.callback.clone(); - callback.call(status) - } - } - } - } - Err(e) => { - tracing::error!( - "Error trying to acquire MathginListener lock: {}", - e - ); - } - } - } - }); + pub(crate) fn matching_status( + &self, + key_expr: &KeyExpr, + destination: Locality, + matching_type: MatchingStatusType, + ) -> ZResult { + match destination { + Locality::SessionLocal => Ok(self.matching_status_local(key_expr, matching_type)), + Locality::Remote => self.matching_status_remote(key_expr, destination, matching_type), + Locality::Any => { + let local_match = self.matching_status_local(key_expr, matching_type); + if local_match.matching() { + Ok(local_match) + } else { + self.matching_status_remote(key_expr, destination, matching_type) + } } } } #[zenoh_macros::unstable] - pub(crate) fn update_status_down(self: &Arc, state: &SessionState, key_expr: &KeyExpr) { + pub(crate) fn update_matching_status( + self: &Arc, + state: &SessionState, + key_expr: &KeyExpr, + match_type: MatchingStatusType, + status_value: bool, + ) { for msub in state.matching_listeners.values() { - if key_expr.intersects(&msub.key_expr) { + if msub.is_matching(key_expr, match_type) { // Cannot hold session lock when calling tables (matching_status()) // TODO: check which ZRuntime should be used self.task_controller @@ -1804,12 +2028,14 @@ impl SessionInner { async move { match msub.current.lock() { Ok(mut current) => { - if *current { - if let Ok(status) = session - .matching_status(&msub.key_expr, msub.destination) - { - if !status.matching_subscribers() { - *current = false; + if *current != status_value { + if let Ok(status) = session.matching_status( + &msub.key_expr, + msub.destination, + msub.match_type, + ) { + if status.matching() == status_value { + *current = status_value; let callback = msub.callback.clone(); callback.call(status) } @@ -2329,7 +2555,12 @@ impl Primitives for WeakSession { { Ok(expr) => { state.remote_subscribers.insert(m.id, expr.clone()); - self.update_status_up(&state, &expr); + self.update_matching_status( + &state, + &expr, + MatchingStatusType::Subscribers, + true, + ); } Err(err) => { tracing::error!( @@ -2349,7 +2580,12 @@ impl Primitives for WeakSession { return; // Session closing or closed } if let Some(expr) = state.remote_subscribers.remove(&m.id) { - self.update_status_down(&state, &expr); + self.update_matching_status( + &state, + &expr, + MatchingStatusType::Subscribers, + false, + ); } else { tracing::error!("Received Undeclare Subscriber for unknown id: {}", m.id); } @@ -2357,9 +2593,55 @@ impl Primitives for WeakSession { } zenoh_protocol::network::DeclareBody::DeclareQueryable(m) => { trace!("recv DeclareQueryable {} {:?}", m.id, m.wire_expr); + #[cfg(feature = "unstable")] + { + let mut state = zwrite!(self.state); + if state.primitives.is_none() { + return; // Session closing or closed + } + match state + .wireexpr_to_keyexpr(&m.wire_expr, false) + .map(|e| e.into_owned()) + { + Ok(expr) => { + state + .remote_queryables + .insert(m.id, (expr.clone(), m.ext_info.complete)); + self.update_matching_status( + &state, + &expr, + MatchingStatusType::Queryables(m.ext_info.complete), + true, + ); + } + Err(err) => { + tracing::error!( + "Received DeclareQueryable for unknown wire_expr: {}", + err + ) + } + } + } } zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => { trace!("recv UndeclareQueryable {:?}", m.id); + #[cfg(feature = "unstable")] + { + let mut state = zwrite!(self.state); + if state.primitives.is_none() { + return; // Session closing or closed + } + if let Some((expr, complete)) = state.remote_queryables.remove(&m.id) { + self.update_matching_status( + &state, + &expr, + MatchingStatusType::Queryables(complete), + false, + ); + } else { + tracing::error!("Received Undeclare Queryable for unknown id: {}", m.id); + } + } } #[cfg(not(feature = "unstable"))] zenoh_protocol::network::DeclareBody::DeclareToken(_) => {} diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 6684edb193..9ae97d9e7b 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -247,11 +247,6 @@ pub mod bytes { /// declared by a [`Session::declare_subscriber`](crate::Session::declare_subscriber) /// pub mod pubsub { - #[zenoh_macros::unstable] - pub use crate::api::{ - builders::matching_listener::MatchingListenerBuilder, - publisher::{MatchingListener, MatchingListenerUndeclaration, MatchingStatus}, - }; pub use crate::api::{ builders::{ publisher::{ @@ -279,6 +274,13 @@ pub mod query { #[zenoh_macros::internal] pub use crate::api::queryable::ReplySample; + #[zenoh_macros::unstable] + pub use crate::api::{ + builders::querier::{QuerierBuilder, QuerierGetBuilder}, + querier::Querier, + query::ReplyKeyExpr, + selector::ZenohParameters, + }; pub use crate::api::{ builders::{ queryable::QueryableBuilder, @@ -288,8 +290,14 @@ pub mod query { queryable::{Query, Queryable, QueryableUndeclaration}, selector::Selector, }; - #[zenoh_macros::unstable] - pub use crate::api::{query::ReplyKeyExpr, selector::ZenohParameters}; +} + +#[zenoh_macros::unstable] +pub mod matching { + pub use crate::api::{ + builders::matching_listener::MatchingListenerBuilder, + matching::{MatchingListener, MatchingListenerUndeclaration, MatchingStatus}, + }; } /// Callback handler trait. diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index f8a9f1f128..b56c887b65 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -43,6 +43,8 @@ use super::{ resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource}, tables::{NodeId, RoutingExpr, Tables, TablesLock}, }; +#[cfg(feature = "unstable")] +use crate::key_expr::KeyExpr; use crate::net::routing::hat::{HatTrait, SendDeclare}; pub(crate) struct Query { @@ -50,6 +52,18 @@ pub(crate) struct Query { src_qid: RequestId, } +#[zenoh_macros::unstable] +#[inline] +pub(crate) fn get_matching_queryables( + tables: &Tables, + key_expr: &KeyExpr<'_>, + complete: bool, +) -> HashMap> { + tables + .hat_code + .get_matching_queryables(tables, key_expr, complete) +} + #[allow(clippy::too_many_arguments)] pub(crate) fn declare_queryable( hat_code: &(dyn HatTrait + Send + Sync), diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 96a128b75d..cf92614e5f 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -339,37 +339,19 @@ impl HatPubSubTrait for HatCode { .values() .filter(|f| f.whatami != WhatAmI::Client) { - if face.local_interests.values().any(|interest| { + if !face.local_interests.values().any(|interest| { interest.finalized && interest.options.subscribers() && interest .res .as_ref() - .map(|res| { - KeyExpr::try_from(res.expr()) - .and_then(|intres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| intres.includes(&putres)) - }) - .unwrap_or(false) - }) + .map(|res| KeyExpr::keyexpr_include(res.expr(), expr.full_expr())) .unwrap_or(true) - }) { - if face_hat!(face).remote_subs.values().any(|sub| { - KeyExpr::try_from(sub.expr()) - .and_then(|subres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| subres.intersects(&putres)) - }) - .unwrap_or(false) - }) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.insert( - face.id, - (face.clone(), key_expr.to_owned(), NodeId::default()), - ); - } - } else { + }) || face_hat!(face) + .remote_subs + .values() + .any(|sub| KeyExpr::keyexpr_intersect(sub.expr(), expr.full_expr())) + { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.insert( face.id, @@ -428,17 +410,13 @@ impl HatPubSubTrait for HatCode { && interest .res .as_ref() - .map(|res| { - KeyExpr::try_from(res.expr()) - .map(|intres| intres.includes(key_expr)) - .unwrap_or(false) - }) + .map(|res| KeyExpr::keyexpr_include(res.expr(), key_expr)) .unwrap_or(true) - }) && face_hat!(face).remote_subs.values().any(|sub| { - KeyExpr::try_from(sub.expr()) - .map(|subres| subres.intersects(key_expr)) - .unwrap_or(false) - }) { + }) && face_hat!(face) + .remote_subs + .values() + .any(|sub| KeyExpr::keyexpr_intersect(sub.expr(), key_expr)) + { matching_subscriptions.insert(face.id, face.clone()); } } diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index cd417ef84b..7f2f9cb8d9 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -33,15 +33,18 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; -use crate::net::routing::{ - dispatcher::{ - face::FaceState, - resource::{NodeId, Resource, SessionContext}, - tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, +use crate::{ + key_expr::KeyExpr, + net::routing::{ + dispatcher::{ + face::FaceState, + resource::{NodeId, Resource, SessionContext}, + tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, + }, + hat::{HatQueriesTrait, SendDeclare, Sources}, + router::{update_query_routes_from, RoutesIndexes}, + RoutingContext, }, - hat::{HatQueriesTrait, SendDeclare, Sources}, - router::RoutesIndexes, - RoutingContext, }; #[inline] @@ -272,6 +275,8 @@ pub(super) fn queries_new_face( propagate_simple_queryable(tables, qabl, Some(&mut face.clone()), send_declare); } } + // recompute routes + update_query_routes_from(tables, &mut tables.root_res.clone()); } lazy_static::lazy_static! { @@ -349,12 +354,30 @@ impl HatQueriesTrait for HatCode { }; if source_type == WhatAmI::Client { - if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.push(QueryTargetQabl { - direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - info: None, - }); + for face in tables + .faces + .values() + .filter(|f| f.whatami != WhatAmI::Client) + { + if !face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.queryables() + && interest + .res + .as_ref() + .map(|res| KeyExpr::keyexpr_include(res.expr(), expr.full_expr())) + .unwrap_or(true) + }) || face_hat!(face) + .remote_qabls + .values() + .any(|qbl| KeyExpr::keyexpr_intersect(qbl.expr(), expr.full_expr())) + { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + info: None, + }); + } } } @@ -388,4 +411,79 @@ impl HatQueriesTrait for HatCode { fn get_query_routes_entries(&self, _tables: &Tables) -> RoutesIndexes { get_routes_entries() } + + #[cfg(feature = "unstable")] + fn get_matching_queryables( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + complete: bool, + ) -> HashMap> { + let mut matching_queryables = HashMap::new(); + if key_expr.ends_with('/') { + return matching_queryables; + } + tracing::trace!( + "get_matching_queryables({}; complete: {})", + key_expr, + complete + ); + for face in tables + .faces + .values() + .filter(|f| f.whatami != WhatAmI::Client) + { + if face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.queryables() + && interest + .res + .as_ref() + .map(|res| KeyExpr::keyexpr_include(res.expr(), key_expr)) + .unwrap_or(true) + }) && face_hat!(face) + .remote_qabls + .values() + .any(|qbl| match complete { + true => { + qbl.session_ctxs + .get(&face.id) + .and_then(|sc| sc.qabl) + .map_or(false, |q| q.complete) + && KeyExpr::keyexpr_include(qbl.expr(), key_expr) + } + false => KeyExpr::keyexpr_intersect(qbl.expr(), key_expr), + }) + { + matching_queryables.insert(face.id, face.clone()); + } + } + + let res = Resource::get_resource(&tables.root_res, key_expr); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { + continue; + } + for (sid, context) in &mres.session_ctxs { + if context.face.whatami == WhatAmI::Client + && match complete { + true => context.qabl.map_or(false, |q| q.complete), + false => context.qabl.is_some(), + } + { + matching_queryables + .entry(*sid) + .or_insert_with(|| context.face.clone()); + } + } + } + matching_queryables + } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 8668c01ba2..2f7772193a 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -40,6 +40,8 @@ use super::{ face_hat, face_hat_mut, get_peer, get_routes_entries, hat, hat_mut, network::Network, res_hat, res_hat_mut, HatCode, HatContext, HatFace, HatTables, }; +#[cfg(feature = "unstable")] +use crate::key_expr::KeyExpr; use crate::net::routing::{ dispatcher::{ face::FaceState, @@ -1011,4 +1013,89 @@ impl HatQueriesTrait for HatCode { fn get_query_routes_entries(&self, tables: &Tables) -> RoutesIndexes { get_routes_entries(tables) } + + #[cfg(feature = "unstable")] + fn get_matching_queryables( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + complete: bool, + ) -> HashMap> { + let mut matching_queryables = HashMap::new(); + if key_expr.ends_with('/') { + return matching_queryables; + } + tracing::trace!( + "get_matching_queryables({}; complete: {})", + key_expr, + complete + ); + + let res = Resource::get_resource(&tables.root_res, key_expr); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { + continue; + } + + let net = hat!(tables).linkstatepeers_net.as_ref().unwrap(); + insert_faces_for_qbls( + &mut matching_queryables, + tables, + net, + &res_hat!(mres).linkstatepeer_qabls, + complete, + ); + + for (sid, context) in &mres.session_ctxs { + if match complete { + true => context.qabl.map_or(false, |q| q.complete), + false => context.qabl.is_some(), + } { + matching_queryables + .entry(*sid) + .or_insert_with(|| context.face.clone()); + } + } + } + matching_queryables + } +} + +#[cfg(feature = "unstable")] +#[inline] +fn insert_faces_for_qbls( + route: &mut HashMap>, + tables: &Tables, + net: &Network, + qbls: &HashMap, + complete: bool, +) { + let source = net.idx.index(); + if net.trees.len() > source { + for qbl in qbls { + if complete && !qbl.1.complete { + continue; + } + if let Some(qbl_idx) = net.get_idx(qbl.0) { + if net.trees[source].directions.len() > qbl_idx.index() { + if let Some(direction) = net.trees[source].directions[qbl_idx.index()] { + if net.graph.contains_node(direction) { + if let Some(face) = tables.get_face(&net.graph[direction].zid) { + route.entry(face.id).or_insert_with(|| face.clone()); + } + } + } + } + } + } + } else { + tracing::trace!("Tree for node sid:{} not yet ready", source); + } } diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index fb2ae44c3a..92d33115f1 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -232,6 +232,14 @@ pub(crate) trait HatQueriesTrait { ) -> Arc; fn get_query_routes_entries(&self, tables: &Tables) -> RoutesIndexes; + + #[zenoh_macros::unstable] + fn get_matching_queryables( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + complete: bool, + ) -> HashMap>; } pub(crate) fn new_hat(whatami: WhatAmI, config: &Config) -> Box { diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 5722a40d9b..8a050d5c03 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -607,37 +607,19 @@ impl HatPubSubTrait for HatCode { .values() .filter(|f| f.whatami == WhatAmI::Router) { - if face.local_interests.values().any(|interest| { + if !face.local_interests.values().any(|interest| { interest.finalized && interest.options.subscribers() && interest .res .as_ref() - .map(|res| { - KeyExpr::try_from(res.expr()) - .and_then(|intres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| intres.includes(&putres)) - }) - .unwrap_or(false) - }) + .map(|res| KeyExpr::keyexpr_include(res.expr(), expr.full_expr())) .unwrap_or(true) - }) { - if face_hat!(face).remote_subs.values().any(|sub| { - KeyExpr::try_from(sub.expr()) - .and_then(|subres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| subres.intersects(&putres)) - }) - .unwrap_or(false) - }) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.insert( - face.id, - (face.clone(), key_expr.to_owned(), NodeId::default()), - ); - } - } else { + }) || face_hat!(face) + .remote_subs + .values() + .any(|sub| KeyExpr::keyexpr_intersect(sub.expr(), expr.full_expr())) + { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.insert( face.id, diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 052d401690..db67952745 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -36,15 +36,20 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; -use crate::net::routing::{ - dispatcher::{ - face::FaceState, - resource::{NodeId, Resource, SessionContext}, - tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, +use crate::{ + key_expr::KeyExpr, + net::routing::{ + dispatcher::{ + face::FaceState, + resource::{NodeId, Resource, SessionContext}, + tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, + }, + hat::{ + p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources, + }, + router::{update_query_routes_from, RoutesIndexes}, + RoutingContext, }, - hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, - router::{update_query_routes_from, RoutesIndexes}, - RoutingContext, }; #[inline] @@ -589,13 +594,31 @@ impl HatQueriesTrait for HatCode { }; if source_type == WhatAmI::Client { - // TODO: BNestMatching: What if there is a local compete ? - if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.push(QueryTargetQabl { - direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - info: None, - }); + // TODO: BestMatching: What if there is a local compete ? + for face in tables + .faces + .values() + .filter(|f| f.whatami == WhatAmI::Router) + { + if !face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.queryables() + && interest + .res + .as_ref() + .map(|res| KeyExpr::keyexpr_include(res.expr(), expr.full_expr())) + .unwrap_or(true) + }) || face_hat!(face) + .remote_qabls + .values() + .any(|sub| KeyExpr::keyexpr_intersect(sub.expr(), expr.full_expr())) + { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + info: None, + }); + } } for face in tables.faces.values().filter(|f| { @@ -646,4 +669,46 @@ impl HatQueriesTrait for HatCode { fn get_query_routes_entries(&self, _tables: &Tables) -> RoutesIndexes { get_routes_entries() } + + #[cfg(feature = "unstable")] + fn get_matching_queryables( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + complete: bool, + ) -> HashMap> { + let mut matching_queryables = HashMap::new(); + if key_expr.ends_with('/') { + return matching_queryables; + } + tracing::trace!( + "get_matching_queryables({}; complete: {})", + key_expr, + complete + ); + let res = Resource::get_resource(&tables.root_res, key_expr); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { + continue; + } + for (sid, context) in &mres.session_ctxs { + if match complete { + true => context.qabl.map_or(false, |q| q.complete), + false => context.qabl.is_some(), + } { + matching_queryables + .entry(*sid) + .or_insert_with(|| context.face.clone()); + } + } + } + matching_queryables + } } diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 34e05c43c0..6739f22bb9 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -41,6 +41,8 @@ use super::{ interests::push_declaration_profile, network::Network, res_hat, res_hat_mut, HatCode, HatContext, HatFace, HatTables, }; +#[cfg(feature = "unstable")] +use crate::key_expr::KeyExpr; use crate::net::routing::{ dispatcher::{ face::FaceState, @@ -1494,4 +1496,109 @@ impl HatQueriesTrait for HatCode { fn get_query_routes_entries(&self, tables: &Tables) -> RoutesIndexes { get_routes_entries(tables) } + + #[cfg(feature = "unstable")] + fn get_matching_queryables( + &self, + tables: &Tables, + key_expr: &KeyExpr<'_>, + complete: bool, + ) -> HashMap> { + let mut matching_queryables = HashMap::new(); + if key_expr.ends_with('/') { + return matching_queryables; + } + tracing::trace!( + "get_matching_queryables({}; complete: {})", + key_expr, + complete + ); + crate::net::routing::dispatcher::pubsub::get_matching_subscriptions(tables, key_expr); + let res = Resource::get_resource(&tables.root_res, key_expr); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); + + let master = !hat!(tables).full_net(WhatAmI::Peer) + || *hat!(tables).elect_router(&tables.zid, key_expr, hat!(tables).shared_nodes.iter()) + == tables.zid; + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { + continue; + } + + if master { + let net = hat!(tables).routers_net.as_ref().unwrap(); + insert_faces_for_qbls( + &mut matching_queryables, + tables, + net, + &res_hat!(mres).router_qabls, + complete, + ); + } + + if hat!(tables).full_net(WhatAmI::Peer) { + let net = hat!(tables).linkstatepeers_net.as_ref().unwrap(); + insert_faces_for_qbls( + &mut matching_queryables, + tables, + net, + &res_hat!(mres).linkstatepeer_qabls, + complete, + ); + } + + if master { + for (sid, context) in &mres.session_ctxs { + if match complete { + true => context.qabl.map_or(false, |q| q.complete), + false => context.qabl.is_some(), + } && context.face.whatami != WhatAmI::Router + { + matching_queryables + .entry(*sid) + .or_insert_with(|| context.face.clone()); + } + } + } + } + matching_queryables + } +} + +#[cfg(feature = "unstable")] +#[inline] +fn insert_faces_for_qbls( + route: &mut HashMap>, + tables: &Tables, + net: &Network, + qbls: &HashMap, + complete: bool, +) { + let source = net.idx.index(); + if net.trees.len() > source { + for qbl in qbls { + if complete && !qbl.1.complete { + continue; + } + if let Some(qbl_idx) = net.get_idx(qbl.0) { + if net.trees[source].directions.len() > qbl_idx.index() { + if let Some(direction) = net.trees[source].directions[qbl_idx.index()] { + if net.graph.contains_node(direction) { + if let Some(face) = tables.get_face(&net.graph[direction].zid) { + route.entry(face.id).or_insert_with(|| face.clone()); + } + } + } + } + } + } + } else { + tracing::trace!("Tree for node sid:{} not yet ready", source); + } } diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index 5fc3256cf6..503c1f5e4b 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -15,7 +15,12 @@ use std::time::Duration; -use zenoh::{sample::Locality, Result as ZResult, Session}; +use zenoh::{ + handlers::FifoChannelHandler, + matching::{MatchingListener, MatchingStatus}, + sample::Locality, + Result as ZResult, Session, +}; use zenoh_config::{ModeDependentValue, WhatAmI}; use zenoh_core::ztimeout; @@ -45,196 +50,199 @@ async fn create_session_pair(locator: &str) -> (Session, Session) { (session1, session2) } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn zenoh_matching_status_any() -> ZResult<()> { - zenoh_util::init_log_from_env_or("error"); - let (session1, session2) = create_session_pair("tcp/127.0.0.1:18001").await; - - let publisher1 = ztimeout!(session1 - .declare_publisher("zenoh_matching_status_any_test") - .allowed_destination(Locality::Any)) - .unwrap(); - - let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); - +fn get_matching_listener_status( + matching_listener: &MatchingListener>, +) -> Option { let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.unwrap().is_none()); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); - - let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_any_test")).unwrap(); + received_status.ok().flatten().map(|s| s.matching()) +} - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status - .ok() - .flatten() - .map(|s| s.matching_subscribers()) - .eq(&Some(true))); +fn is_locality_compatible(locality: Locality, same_session: bool) -> bool { + match locality { + Locality::SessionLocal => same_session, + Locality::Remote => !same_session, + Locality::Any => true, + } +} - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(matching_status.matching_subscribers()); +async fn zenoh_querier_matching_status_inner(querier_locality: Locality, same_session: bool) { + println!( + "Querier origin :{:?}, same session: {same_session}", + querier_locality + ); + zenoh_util::init_log_from_env_or("error"); + let key_expr = match querier_locality { + Locality::SessionLocal => "zenoh_querier_matching_status_local_test", + Locality::Remote => "zenoh_querier_matching_status_remote_test", + Locality::Any => "zenoh_querier_matching_status_any_test", + }; - ztimeout!(sub.undeclare()).unwrap(); + let (session1, session2) = match same_session { + false => create_session_pair("tcp/127.0.0.1:18002").await, + true => { + let s1 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + let s2 = s1.clone(); + (s1, s2) + } + }; + let locality_compatible = is_locality_compatible(querier_locality, same_session); - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status - .ok() - .flatten() - .map(|s| s.matching_subscribers()) - .eq(&Some(false))); + let querier1 = ztimeout!(session1 + .declare_querier(format!("{key_expr}/value")) + .target(zenoh::query::QueryTarget::BestMatching) + .allowed_destination(querier_locality)) + .unwrap(); - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); + let querier2 = ztimeout!(session1 + .declare_querier(format!("{key_expr}/*")) + .target(zenoh::query::QueryTarget::AllComplete) + .allowed_destination(querier_locality)) + .unwrap(); - let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_any_test")).unwrap(); + let matching_listener1 = ztimeout!(querier1.matching_listener()).unwrap(); + let matching_listener2 = ztimeout!(querier2.matching_listener()).unwrap(); - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status - .ok() - .flatten() - .map(|s| s.matching_subscribers()) - .eq(&Some(true))); + assert!(!ztimeout!(querier1.matching_status()).unwrap().matching()); + assert!(!ztimeout!(querier2.matching_status()).unwrap().matching()); - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(matching_status.matching_subscribers()); + assert_eq!(get_matching_listener_status(&matching_listener1), None); + assert_eq!(get_matching_listener_status(&matching_listener2), None); - ztimeout!(sub.undeclare()).unwrap(); + let qbl1 = ztimeout!(session2 + .declare_queryable(format!("{key_expr}/value")) + .complete(false)) + .unwrap(); + // There is a bug that hats do not distinguish whether they register/unregister complete or incomplete queryable, + // if there are more than one with the same keyexpr on the same face + //let qbl2 = ztimeout!(session2 + // .declare_queryable(format!("{key_expr}/*")) + // .complete(false)) + //.unwrap(); + + let _qbl3 = ztimeout!(session2 + .declare_queryable(format!("{key_expr}/value3")) + .complete(true)) + .unwrap(); - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status - .ok() - .flatten() - .map(|s| s.matching_subscribers()) - .eq(&Some(false))); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); - Ok(()) + assert_eq!( + get_matching_listener_status(&matching_listener1), + locality_compatible.then_some(true) + ); + assert_eq!(get_matching_listener_status(&matching_listener2), None); + + assert_eq!( + ztimeout!(querier1.matching_status()).unwrap().matching(), + locality_compatible + ); + assert!(!ztimeout!(querier2.matching_status()).unwrap().matching()); + + let qbl4 = ztimeout!(session2 + .declare_queryable(format!("{key_expr}/*")) + .complete(true)) + .unwrap(); + assert_eq!( + get_matching_listener_status(&matching_listener2), + locality_compatible.then_some(true) + ); + assert_eq!( + ztimeout!(querier2.matching_status()).unwrap().matching(), + locality_compatible + ); + + ztimeout!(qbl4.undeclare()).unwrap(); + + assert_eq!(get_matching_listener_status(&matching_listener1), None); + assert_eq!( + get_matching_listener_status(&matching_listener2), + locality_compatible.then_some(false) + ); + assert_eq!( + ztimeout!(querier1.matching_status()).unwrap().matching(), + locality_compatible + ); + assert!(!ztimeout!(querier2.matching_status()).unwrap().matching()); + + ztimeout!(qbl1.undeclare()).unwrap(); + // ztimeout!(qbl2.undeclare()).unwrap(); + assert_eq!( + get_matching_listener_status(&matching_listener1), + locality_compatible.then_some(false) + ); + assert_eq!(get_matching_listener_status(&matching_listener2), None); + assert!(!ztimeout!(querier1.matching_status()).unwrap().matching()); + assert!(!ztimeout!(querier2.matching_status()).unwrap().matching()); } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn zenoh_matching_status_remote() -> ZResult<()> { +async fn zenoh_publisher_matching_status_inner(publisher_locality: Locality, same_session: bool) { + println!( + "Publisher origin: {:?}, same session: {same_session}", + publisher_locality + ); zenoh_util::init_log_from_env_or("error"); + let key_expr = match publisher_locality { + Locality::SessionLocal => "zenoh_publisher_matching_status_local_test", + Locality::Remote => "zenoh_publisher_matching_status_remote_test", + Locality::Any => "zenoh_publisher_matching_status_any_test", + }; - let session1 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); - let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + let (session1, session2) = match same_session { + false => create_session_pair("tcp/127.0.0.1:18001").await, + true => { + let s1 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + let s2 = s1.clone(); + (s1, s2) + } + }; + let locality_compatible = is_locality_compatible(publisher_locality, same_session); - let publisher1 = ztimeout!(session1 - .declare_publisher("zenoh_matching_status_remote_test") - .allowed_destination(Locality::Remote)) + let publisher = ztimeout!(session1 + .declare_publisher(format!("{key_expr}/*")) + .allowed_destination(publisher_locality)) .unwrap(); - let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.unwrap().is_none()); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); - - let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_remote_test")).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.unwrap().is_none()); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); - - ztimeout!(sub.undeclare()).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.unwrap().is_none()); + let matching_listener = ztimeout!(publisher.matching_listener()).unwrap(); - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); + assert_eq!(get_matching_listener_status(&matching_listener), None); - let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_remote_test")).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status - .ok() - .flatten() - .map(|s| s.matching_subscribers()) - .eq(&Some(true))); + let sub = ztimeout!(session2.declare_subscriber(format!("{key_expr}/value"))).unwrap(); - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(matching_status.matching_subscribers()); + assert_eq!( + get_matching_listener_status(&matching_listener), + locality_compatible.then_some(true) + ); + assert_eq!( + ztimeout!(publisher.matching_status()).unwrap().matching(), + locality_compatible + ); ztimeout!(sub.undeclare()).unwrap(); - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status - .ok() - .flatten() - .map(|s| s.matching_subscribers()) - .eq(&Some(false))); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); + assert_eq!( + get_matching_listener_status(&matching_listener), + locality_compatible.then_some(false) + ); +} +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn zenoh_querier_matching_status() -> ZResult<()> { + zenoh_util::init_log_from_env_or("error"); + zenoh_querier_matching_status_inner(Locality::Any, true).await; + zenoh_querier_matching_status_inner(Locality::Any, false).await; + zenoh_querier_matching_status_inner(Locality::Remote, true).await; + zenoh_querier_matching_status_inner(Locality::Remote, false).await; + zenoh_querier_matching_status_inner(Locality::SessionLocal, true).await; + zenoh_querier_matching_status_inner(Locality::SessionLocal, false).await; Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn zenoh_matching_status_local() -> ZResult<()> { +async fn zenoh_publisher_matching_status() -> ZResult<()> { zenoh_util::init_log_from_env_or("error"); - - let session1 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); - let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); - - let publisher1 = ztimeout!(session1 - .declare_publisher("zenoh_matching_status_local_test") - .allowed_destination(Locality::SessionLocal)) - .unwrap(); - - let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.unwrap().is_none()); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); - - let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_local_test")).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status - .ok() - .flatten() - .map(|s| s.matching_subscribers()) - .eq(&Some(true))); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(matching_status.matching_subscribers()); - - ztimeout!(sub.undeclare()).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status - .ok() - .flatten() - .map(|s| s.matching_subscribers()) - .eq(&Some(false))); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); - - let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_local_test")).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.unwrap().is_none()); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); - - ztimeout!(sub.undeclare()).unwrap(); - - let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.unwrap().is_none()); - - let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); - assert!(!matching_status.matching_subscribers()); - + zenoh_publisher_matching_status_inner(Locality::Any, true).await; + zenoh_publisher_matching_status_inner(Locality::Any, false).await; + zenoh_publisher_matching_status_inner(Locality::Remote, true).await; + zenoh_publisher_matching_status_inner(Locality::Remote, false).await; + zenoh_publisher_matching_status_inner(Locality::SessionLocal, true).await; + zenoh_publisher_matching_status_inner(Locality::SessionLocal, false).await; Ok(()) } diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index c94e681181..a0eb6be130 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -26,6 +26,8 @@ use std::{ use zenoh::internal::runtime::{Runtime, RuntimeBuilder}; #[cfg(feature = "unstable")] use zenoh::qos::Reliability; +#[cfg(feature = "unstable")] +use zenoh::query::Querier; use zenoh::{key_expr::KeyExpr, qos::CongestionControl, sample::SampleKind, Session}; use zenoh_core::ztimeout; #[cfg(not(feature = "unstable"))] @@ -159,30 +161,62 @@ async fn test_session_pubsub(peer01: &Session, peer02: &Session, reliability: Re } } -async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Reliability) { - let key_expr = "test/session"; +trait HasGet { + async fn get(&self, params: &str) -> zenoh::handlers::FifoChannelHandler; +} + +struct SessionGetter<'a, 'b> { + session: &'a Session, + key_expr: &'b str, +} + +impl HasGet for SessionGetter<'_, '_> { + async fn get(&self, params: &str) -> zenoh::handlers::FifoChannelHandler { + let selector = format!("{}?{}", self.key_expr, params); + ztimeout!(self.session.get(selector)).unwrap() + } +} + +#[cfg(feature = "unstable")] +struct QuerierGetter<'a> { + querier: Querier<'a>, +} + +#[cfg(feature = "unstable")] +impl HasGet for QuerierGetter<'_> { + async fn get(&self, params: &str) -> zenoh::handlers::FifoChannelHandler { + ztimeout!(self.querier.get().parameters(params)).unwrap() + } +} + +async fn test_session_query_reply_internal( + peer: &Session, + key_expr: &str, + reliability: Reliability, + log_id: &str, + getter: &Getter, +) { let msg_count = match reliability { Reliability::Reliable => MSG_COUNT, Reliability::BestEffort => 1, }; - let msgs = Arc::new(AtomicUsize::new(0)); + let msgs = Arc::new(AtomicUsize::new(0)); for size in MSG_SIZE { msgs.store(0, Ordering::Relaxed); // Queryable to data - println!("[QR][01c] Queryable on peer01 session"); + println!("[{log_id}][01c] Queryable on peer01 session"); let c_msgs = msgs.clone(); - let qbl = ztimeout!(peer01.declare_queryable(key_expr).callback(move |query| { + let ke = key_expr.to_owned(); + let qbl = ztimeout!(peer.declare_queryable(key_expr).callback(move |query| { c_msgs.fetch_add(1, Ordering::Relaxed); match query.parameters().as_str() { "ok_put" => { tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(async { - ztimeout!(query.reply( - KeyExpr::try_from(key_expr).unwrap(), - vec![0u8; size].to_vec() - )) + ztimeout!(query + .reply(KeyExpr::try_from(&ke).unwrap(), vec![0u8; size].to_vec())) .unwrap() }) }); @@ -190,7 +224,7 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re "ok_del" => { tokio::task::block_in_place(|| { tokio::runtime::Handle::current() - .block_on(async { ztimeout!(query.reply_del(key_expr)).unwrap() }) + .block_on(async { ztimeout!(query.reply_del(&ke)).unwrap() }) }); } "err" => { @@ -209,11 +243,10 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re tokio::time::sleep(SLEEP).await; // Get data - println!("[QR][02c] Getting Ok(Put) on peer02 session. {msg_count} msgs."); + println!("[{log_id}][02c] Getting Ok(Put) on peer02 session. {msg_count} msgs."); let mut cnt = 0; for _ in 0..msg_count { - let selector = format!("{}?ok_put", key_expr); - let rs = ztimeout!(peer02.get(selector)).unwrap(); + let rs = getter.get("ok_put").await; while let Ok(s) = ztimeout!(rs.recv_async()) { let s = s.result().unwrap(); assert_eq!(s.kind(), SampleKind::Put); @@ -221,17 +254,16 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re cnt += 1; } } - println!("[QR][02c] Got on peer02 session. {cnt}/{msg_count} msgs."); + println!("[{log_id}][02c] Got on peer02 session. {cnt}/{msg_count} msgs."); assert_eq!(msgs.load(Ordering::Relaxed), msg_count); assert_eq!(cnt, msg_count); msgs.store(0, Ordering::Relaxed); - println!("[QR][03c] Getting Ok(Delete) on peer02 session. {msg_count} msgs."); + println!("[{log_id}][03c] Getting Ok(Delete) on peer02 session. {msg_count} msgs."); let mut cnt = 0; for _ in 0..msg_count { - let selector = format!("{}?ok_del", key_expr); - let rs = ztimeout!(peer02.get(selector)).unwrap(); + let rs = getter.get("ok_del").await; while let Ok(s) = ztimeout!(rs.recv_async()) { let s = s.result().unwrap(); assert_eq!(s.kind(), SampleKind::Delete); @@ -239,28 +271,27 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re cnt += 1; } } - println!("[QR][03c] Got on peer02 session. {cnt}/{msg_count} msgs."); + println!("[{log_id}][03c] Got on peer02 session. {cnt}/{msg_count} msgs."); assert_eq!(msgs.load(Ordering::Relaxed), msg_count); assert_eq!(cnt, msg_count); msgs.store(0, Ordering::Relaxed); - println!("[QR][04c] Getting Err() on peer02 session. {msg_count} msgs."); + println!("[{log_id}][04c] Getting Err() on peer02 session. {msg_count} msgs."); let mut cnt = 0; for _ in 0..msg_count { - let selector = format!("{}?err", key_expr); - let rs = ztimeout!(peer02.get(selector)).unwrap(); + let rs = getter.get("err").await; while let Ok(s) = ztimeout!(rs.recv_async()) { let e = s.result().unwrap_err(); assert_eq!(e.payload().len(), size); cnt += 1; } } - println!("[QR][04c] Got on peer02 session. {cnt}/{msg_count} msgs."); + println!("[{log_id}][04c] Got on peer02 session. {cnt}/{msg_count} msgs."); assert_eq!(msgs.load(Ordering::Relaxed), msg_count); assert_eq!(cnt, msg_count); - println!("[PS][03c] Unqueryable on peer01 session"); + println!("[{log_id}][03c] Unqueryable on peer01 session"); ztimeout!(qbl.undeclare()).unwrap(); // Wait for the declaration to propagate @@ -268,12 +299,45 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re } } +async fn test_session_getrep(peer01: &Session, peer02: &Session, reliability: Reliability) { + let key_expr = "test/session"; + let querier = SessionGetter { + session: peer02, + key_expr, + }; + ztimeout!(test_session_query_reply_internal( + peer01, + key_expr, + reliability, + "QR", + &querier + )) +} + +#[cfg(feature = "unstable")] +async fn test_session_qrrep(peer01: &Session, peer02: &Session, reliability: Reliability) { + let key_expr = "test/session"; + println!("[QQ][00c] Declaring Querier on peer02 session"); + let querier = QuerierGetter { + querier: ztimeout!(peer02.declare_querier(key_expr)).unwrap(), + }; + ztimeout!(test_session_query_reply_internal( + peer01, + key_expr, + reliability, + "QQ", + &querier + )) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_session_unicast() { zenoh::init_log_from_env_or("error"); let (peer01, peer02) = open_session_unicast(&["tcp/127.0.0.1:17447"]).await; test_session_pubsub(&peer01, &peer02, Reliability::Reliable).await; - test_session_qryrep(&peer01, &peer02, Reliability::Reliable).await; + test_session_getrep(&peer01, &peer02, Reliability::Reliable).await; + #[cfg(feature = "unstable")] + test_session_qrrep(&peer01, &peer02, Reliability::Reliable).await; close_session(peer01, peer02).await; }