diff --git a/Cargo.lock b/Cargo.lock index 53f2600071..fa55ca4acd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4534,6 +4534,7 @@ dependencies = [ "rand 0.8.5", "rustc_version 0.4.0", "zenoh", + "zenoh-collections", "zenoh-ext", ] diff --git a/commons/zenoh-codec/src/zenoh/mod.rs b/commons/zenoh-codec/src/zenoh/mod.rs index 0d7146dc90..dc38e5ee84 100644 --- a/commons/zenoh-codec/src/zenoh/mod.rs +++ b/commons/zenoh-codec/src/zenoh/mod.rs @@ -13,7 +13,6 @@ // pub mod del; pub mod err; -pub mod pull; pub mod put; pub mod query; pub mod reply; @@ -81,9 +80,6 @@ where fn write(self, writer: &mut W, x: &RequestBody) -> Self::Output { match x { RequestBody::Query(b) => self.write(&mut *writer, b), - RequestBody::Put(b) => self.write(&mut *writer, b), - RequestBody::Del(b) => self.write(&mut *writer, b), - RequestBody::Pull(b) => self.write(&mut *writer, b), } } } @@ -100,9 +96,6 @@ where let codec = Zenoh080Header::new(header); let body = match imsg::mid(codec.header) { id::QUERY => RequestBody::Query(codec.read(&mut *reader)?), - id::PUT => RequestBody::Put(codec.read(&mut *reader)?), - id::DEL => RequestBody::Del(codec.read(&mut *reader)?), - id::PULL => RequestBody::Pull(codec.read(&mut *reader)?), _ => return Err(DidntRead), }; @@ -121,7 +114,6 @@ where match x { ResponseBody::Reply(b) => self.write(&mut *writer, b), ResponseBody::Err(b) => self.write(&mut *writer, b), - ResponseBody::Put(b) => self.write(&mut *writer, b), } } } @@ -139,7 +131,6 @@ where let body = match imsg::mid(codec.header) { id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?), id::ERR => ResponseBody::Err(codec.read(&mut *reader)?), - id::PUT => ResponseBody::Put(codec.read(&mut *reader)?), _ => return Err(DidntRead), }; diff --git a/commons/zenoh-codec/src/zenoh/pull.rs b/commons/zenoh-codec/src/zenoh/pull.rs deleted file mode 100644 index dc71901d58..0000000000 --- a/commons/zenoh-codec/src/zenoh/pull.rs +++ /dev/null @@ -1,93 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header}; -use alloc::vec::Vec; -use zenoh_buffers::{ - reader::{DidntRead, Reader}, - writer::{DidntWrite, Writer}, -}; - -use zenoh_protocol::{ - common::imsg, - zenoh::{ - id, - pull::{flag, Pull}, - }, -}; - -impl WCodec<&Pull, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &Pull) -> Self::Output { - let Pull { ext_unknown } = x; - - // Header - let mut header = id::PULL; - let mut n_exts = ext_unknown.len() as u8; - if n_exts != 0 { - header |= flag::Z; - } - self.write(&mut *writer, header)?; - - // Extensions - for u in ext_unknown.iter() { - n_exts -= 1; - self.write(&mut *writer, (u, n_exts != 0))?; - } - - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let header: u8 = self.read(&mut *reader)?; - let codec = Zenoh080Header::new(header); - codec.read(reader) - } -} - -impl RCodec for Zenoh080Header -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - if imsg::mid(self.header) != id::PULL { - return Err(DidntRead); - } - - // Extensions - let mut ext_unknown = Vec::new(); - - let mut has_ext = imsg::has_flag(self.header, flag::Z); - while has_ext { - let ext: u8 = self.codec.read(&mut *reader)?; - let (u, ext) = extension::read(reader, "Pull", ext)?; - ext_unknown.push(u); - has_ext = ext; - } - - Ok(Pull { ext_unknown }) - } -} diff --git a/commons/zenoh-codec/tests/codec.rs b/commons/zenoh-codec/tests/codec.rs index 3bca8b7489..2f0e870c4f 100644 --- a/commons/zenoh-codec/tests/codec.rs +++ b/commons/zenoh-codec/tests/codec.rs @@ -600,8 +600,3 @@ fn codec_reply() { fn codec_err() { run!(zenoh::Err, zenoh::Err::rand()); } - -#[test] -fn codec_pull() { - run!(zenoh::Pull, zenoh::Pull::rand()); -} diff --git a/commons/zenoh-collections/src/ring_buffer.rs b/commons/zenoh-collections/src/ring_buffer.rs index fd60030ebc..e9f7909d5f 100644 --- a/commons/zenoh-collections/src/ring_buffer.rs +++ b/commons/zenoh-collections/src/ring_buffer.rs @@ -40,6 +40,15 @@ impl RingBuffer { Some(elem) } + #[inline] + pub fn push_force(&mut self, elem: T) -> Option { + self.push(elem).and_then(|elem| { + let ret = self.buffer.pop_front(); + self.buffer.push_back(elem); + ret + }) + } + #[inline] pub fn pull(&mut self) -> Option { let x = self.buffer.pop_front(); diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index 2dd8de4ef8..187fa87662 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -146,31 +146,6 @@ impl Declare { } } -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] -#[repr(u8)] -pub enum Mode { - #[default] - Push, - Pull, -} - -impl Mode { - pub const DEFAULT: Self = Self::Push; - - #[cfg(feature = "test")] - fn rand() -> Self { - use rand::Rng; - - let mut rng = rand::thread_rng(); - - if rng.gen_bool(0.5) { - Mode::Push - } else { - Mode::Pull - } - } -} - pub mod common { use super::*; @@ -320,9 +295,7 @@ pub mod subscriber { /// ~ [decl_exts] ~ if Z==1 /// +---------------+ /// - /// - if R==1 then the subscription is reliable, else it is best effort - /// - if P==1 then the subscription is pull, else it is push - /// + /// - if R==1 then the subscription is reliable, else it is best effort /// /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DeclareSubscriber { @@ -343,34 +316,29 @@ pub mod subscriber { /// +-+-+-+-+-+-+-+-+ /// |Z|0_1| ID | /// +-+-+-+---------+ - /// % reserved |P|R% + /// % reserved |R% /// +---------------+ /// /// - if R==1 then the subscription is reliable, else it is best effort - /// - if P==1 then the subscription is pull, else it is push /// - rsv: Reserved /// ``` #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SubscriberInfo { pub reliability: Reliability, - pub mode: Mode, } impl SubscriberInfo { pub const R: u64 = 1; - pub const P: u64 = 1 << 1; pub const DEFAULT: Self = Self { reliability: Reliability::DEFAULT, - mode: Mode::DEFAULT, }; #[cfg(feature = "test")] pub fn rand() -> Self { let reliability = Reliability::rand(); - let mode = Mode::rand(); - Self { reliability, mode } + Self { reliability } } } @@ -387,12 +355,7 @@ pub mod subscriber { } else { Reliability::BestEffort }; - let mode = if imsg::has_option(ext.value, SubscriberInfo::P) { - Mode::Pull - } else { - Mode::Push - }; - Self { reliability, mode } + Self { reliability } } } @@ -402,9 +365,6 @@ pub mod subscriber { if ext.reliability == Reliability::Reliable { v |= SubscriberInfo::R; } - if ext.mode == Mode::Pull { - v |= SubscriberInfo::P; - } Info::new(v) } } diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index 3e5d573c43..7bca48f3ba 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -13,7 +13,6 @@ // pub mod del; pub mod err; -pub mod pull; pub mod put; pub mod query; pub mod reply; @@ -21,7 +20,6 @@ pub mod reply; use crate::core::Encoding; pub use del::Del; pub use err::Err; -pub use pull::Pull; pub use put::Put; pub use query::{Consolidation, Query}; pub use reply::Reply; @@ -33,7 +31,6 @@ pub mod id { pub const QUERY: u8 = 0x03; pub const REPLY: u8 = 0x04; pub const ERR: u8 = 0x05; - pub const PULL: u8 = 0x06; } // DataInfo @@ -80,9 +77,6 @@ impl From for PushBody { #[derive(Debug, Clone, PartialEq, Eq)] pub enum RequestBody { Query(Query), - Put(Put), - Del(Del), - Pull(Pull), } impl RequestBody { @@ -92,11 +86,8 @@ impl RequestBody { let mut rng = rand::thread_rng(); - match rng.gen_range(0..4) { + match rng.gen_range(0..1) { 0 => RequestBody::Query(Query::rand()), - 1 => RequestBody::Put(Put::rand()), - 2 => RequestBody::Del(Del::rand()), - 3 => RequestBody::Pull(Pull::rand()), _ => unreachable!(), } } @@ -108,24 +99,11 @@ impl From for RequestBody { } } -impl From for RequestBody { - fn from(p: Put) -> RequestBody { - RequestBody::Put(p) - } -} - -impl From for RequestBody { - fn from(d: Del) -> RequestBody { - RequestBody::Del(d) - } -} - // Response #[derive(Debug, Clone, PartialEq, Eq)] pub enum ResponseBody { Reply(Reply), Err(Err), - Put(Put), } impl ResponseBody { @@ -134,10 +112,9 @@ impl ResponseBody { use rand::Rng; let mut rng = rand::thread_rng(); - match rng.gen_range(0..3) { + match rng.gen_range(0..2) { 0 => ResponseBody::Reply(Reply::rand()), 1 => ResponseBody::Err(Err::rand()), - 2 => ResponseBody::Put(Put::rand()), _ => unreachable!(), } } diff --git a/commons/zenoh-protocol/src/zenoh/pull.rs b/commons/zenoh-protocol/src/zenoh/pull.rs deleted file mode 100644 index eb4f7eb55e..0000000000 --- a/commons/zenoh-protocol/src/zenoh/pull.rs +++ /dev/null @@ -1,56 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use crate::common::ZExtUnknown; -use alloc::vec::Vec; - -/// # Pull message -/// -/// ```text -/// Flags: -/// - X: Reserved -/// - X: Reserved -/// - Z: Extension If Z==1 then at least one extension is present -/// -/// 7 6 5 4 3 2 1 0 -/// +-+-+-+-+-+-+-+-+ -/// |Z|X|X| PULL | -/// +-+-+-+---------+ -/// ~ [pull_exts] ~ if Z==1 -/// +---------------+ -/// ``` -pub mod flag { - // pub const X: u8 = 1 << 5; // 0x20 Reserved - // pub const X: u8 = 1 << 6; // 0x40 Reserved - pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Pull { - pub ext_unknown: Vec, -} - -impl Pull { - #[cfg(feature = "test")] - pub fn rand() -> Self { - use rand::Rng; - let mut rng = rand::thread_rng(); - - let mut ext_unknown = Vec::new(); - for _ in 0..rng.gen_range(0..4) { - ext_unknown.push(ZExtUnknown::rand2(1, false)); - } - - Self { ext_unknown } - } -} diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 190894fb18..b827ed2e7f 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -50,6 +50,7 @@ git-version = { workspace = true } json5 = { workspace = true } log = { workspace = true } zenoh = { workspace = true } +zenoh-collections = { workspace = true } zenoh-ext = { workspace = true } [dev-dependencies] diff --git a/examples/examples/z_pull.rs b/examples/examples/z_pull.rs index 910d7614cf..d2c9a5380b 100644 --- a/examples/examples/z_pull.rs +++ b/examples/examples/z_pull.rs @@ -13,9 +13,12 @@ // use async_std::task::sleep; use clap::Parser; -use std::time::Duration; -use zenoh::config::Config; -use zenoh::prelude::r#async::*; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; +use zenoh::{config::Config, prelude::r#async::*}; +use zenoh_collections::RingBuffer; use zenoh_examples::CommonArgs; #[async_std::main] @@ -23,50 +26,67 @@ async fn main() { // initiate logging env_logger::init(); - let (config, key_expr) = parse_args(); + let (config, key_expr, cache, interval) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); - println!("Declaring Subscriber on '{key_expr}'..."); + println!("Creating a local queue keeping the last {cache} elements..."); + let arb = Arc::new(Mutex::new(RingBuffer::new(cache))); + let arb_c = arb.clone(); - let subscriber = session + println!("Declaring Subscriber on '{key_expr}'..."); + let _subscriber = session .declare_subscriber(&key_expr) - .pull_mode() - .callback(|sample| { - let payload = sample - .payload() - .deserialize::() - .unwrap_or_else(|e| format!("{}", e)); - println!( - ">> [Subscriber] Received {} ('{}': '{}')", - sample.kind(), - sample.key_expr().as_str(), - payload, - ); + .callback(move |sample| { + arb_c.lock().unwrap().push_force(sample); }) .res() .await .unwrap(); - println!("Press CTRL-C to quit..."); - for idx in 0..u32::MAX { - sleep(Duration::from_secs(1)).await; - println!("[{idx:4}] Pulling..."); - subscriber.pull().res().await.unwrap(); + println!("Pulling data every {:#?} seconds", interval); + loop { + let mut res = arb.lock().unwrap().pull(); + print!(">> [Subscriber] Pulling "); + match res.take() { + Some(sample) => { + let payload = sample + .payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)); + println!( + "{} ('{}': '{}')", + sample.kind(), + sample.key_expr().as_str(), + payload, + ); + } + None => { + println!("nothing... sleep for {:#?}", interval); + sleep(interval).await; + } + } } } -#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +#[derive(clap::Parser, Clone, PartialEq, Debug)] struct SubArgs { #[arg(short, long, default_value = "demo/example/**")] /// The Key Expression to subscribe to. key: KeyExpr<'static>, + /// The size of the cache. + #[arg(long, default_value = "3")] + cache: usize, + /// The interval for pulling the cache. + #[arg(long, default_value = "5.0")] + interval: f32, #[command(flatten)] common: CommonArgs, } -fn parse_args() -> (Config, KeyExpr<'static>) { +fn parse_args() -> (Config, KeyExpr<'static>, usize, Duration) { let args = SubArgs::parse(); - (args.common.into(), args.key) + let interval = Duration::from_secs_f32(args.interval); + (args.common.into(), args.key, args.cache, interval) } diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index 31910f51ae..0dd6662286 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -140,12 +140,9 @@ pub fn map_zmsg_to_shminfo(msg: &mut NetworkMessage) -> ZResult { }, NetworkBody::Request(Request { payload, .. }) => match payload { RequestBody::Query(b) => b.map_to_shminfo(), - RequestBody::Put(b) => b.map_to_shminfo(), - RequestBody::Del(_) | RequestBody::Pull(_) => Ok(false), }, NetworkBody::Response(Response { payload, .. }) => match payload { ResponseBody::Reply(b) => b.map_to_shminfo(), - ResponseBody::Put(b) => b.map_to_shminfo(), ResponseBody::Err(b) => b.map_to_shminfo(), }, NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false), @@ -194,13 +191,10 @@ pub fn map_zmsg_to_shmbuf( }, NetworkBody::Request(Request { payload, .. }) => match payload { RequestBody::Query(b) => b.map_to_shmbuf(shmr), - RequestBody::Put(b) => b.map_to_shmbuf(shmr), - RequestBody::Del(_) | RequestBody::Pull(_) => Ok(false), }, NetworkBody::Response(Response { payload, .. }) => match payload { - ResponseBody::Put(b) => b.map_to_shmbuf(shmr), - ResponseBody::Err(b) => b.map_to_shmbuf(shmr), ResponseBody::Reply(b) => b.map_to_shmbuf(shmr), + ResponseBody::Err(b) => b.map_to_shmbuf(shmr), }, NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false), } diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index 89d3b5f691..6ac796efb1 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -21,7 +21,7 @@ use zenoh::{ liveliness::LivelinessSubscriberBuilder, prelude::Sample, query::{QueryConsolidation, QueryTarget}, - subscriber::{PushMode, Reliability, Subscriber, SubscriberBuilder}, + subscriber::{Reliability, Subscriber, SubscriberBuilder}, }; use crate::ExtractSample; @@ -122,9 +122,7 @@ pub trait SubscriberBuilderExt<'a, 'b, Handler> { fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler>; } -impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> - for SubscriberBuilder<'a, 'b, PushMode, Handler> -{ +impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilder<'a, 'b, Handler> { type KeySpace = crate::UserSpace; /// Create a [`FetchingSubscriber`](super::FetchingSubscriber). diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 4103504f13..425aa62592 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -398,7 +398,6 @@ impl Drop for LivelinessToken<'_> { /// let subscriber = session /// .declare_subscriber("key/expression") /// .best_effort() -/// .pull_mode() /// .res() /// .await /// .unwrap(); diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 79c9da9127..cb565053c9 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -244,12 +244,6 @@ impl Primitives for Face { msg.ext_nodeid.node_id, ); } - RequestBody::Pull(_) => { - pull_data(&self.tables.tables, &self.state.clone(), msg.wire_expr); - } - _ => { - log::error!("{} Unsupported request!", self); - } } } diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index c0d1bb4a34..89c6c40206 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -12,17 +12,15 @@ // ZettaScale Zenoh Team, // use super::face::FaceState; -use super::resource::{DataRoutes, Direction, PullCaches, Resource}; +use super::resource::{DataRoutes, Direction, Resource}; use super::tables::{NodeId, Route, RoutingExpr, Tables, TablesLock}; use crate::net::routing::hat::HatTrait; -use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; -use std::sync::RwLock; use zenoh_core::zread; -use zenoh_protocol::core::key_expr::{keyexpr, OwnedKeyExpr}; +use zenoh_protocol::core::key_expr::keyexpr; use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; -use zenoh_protocol::network::declare::{Mode, SubscriberId}; +use zenoh_protocol::network::declare::SubscriberId; use zenoh_protocol::{ core::{WhatAmI, WireExpr}, network::{declare::ext, Push}, @@ -83,13 +81,10 @@ pub(crate) fn declare_subscription( drop(rtables); let wtables = zwrite!(tables.tables); - for (mut res, data_routes, matching_pulls) in matches_data_routes { + for (mut res, data_routes) in matches_data_routes { get_mut_unchecked(&mut res) .context_mut() .update_data_routes(data_routes); - get_mut_unchecked(&mut res) - .context_mut() - .update_matching_pulls(matching_pulls); } drop(wtables); } @@ -148,13 +143,10 @@ pub(crate) fn undeclare_subscription( drop(rtables); let wtables = zwrite!(tables.tables); - for (mut res, data_routes, matching_pulls) in matches_data_routes { + for (mut res, data_routes) in matches_data_routes { get_mut_unchecked(&mut res) .context_mut() .update_data_routes(data_routes); - get_mut_unchecked(&mut res) - .context_mut() - .update_matching_pulls(matching_pulls); } Resource::clean(&mut res); drop(wtables); @@ -223,7 +215,6 @@ pub(crate) fn update_data_routes(tables: &Tables, res: &mut Arc) { pub(crate) fn update_data_routes_from(tables: &mut Tables, res: &mut Arc) { update_data_routes(tables, res); - update_matching_pulls(tables, res); let res = get_mut_unchecked(res); for child in res.childs.values_mut() { update_data_routes_from(tables, child); @@ -233,22 +224,17 @@ pub(crate) fn update_data_routes_from(tables: &mut Tables, res: &mut Arc( tables: &'a Tables, res: &'a Arc, -) -> Vec<(Arc, DataRoutes, Arc)> { +) -> Vec<(Arc, DataRoutes)> { let mut routes = vec![]; if res.context.is_some() { let mut expr = RoutingExpr::new(res, ""); - routes.push(( - res.clone(), - compute_data_routes(tables, &mut expr), - compute_matching_pulls(tables, &mut expr), - )); + routes.push((res.clone(), compute_data_routes(tables, &mut expr))); for match_ in &res.context().matches { let match_ = match_.upgrade().unwrap(); if !Arc::ptr_eq(&match_, res) { let mut expr = RoutingExpr::new(&match_, ""); let match_routes = compute_data_routes(tables, &mut expr); - let matching_pulls = compute_matching_pulls(tables, &mut expr); - routes.push((match_, match_routes, matching_pulls)); + routes.push((match_, match_routes)); } } } @@ -258,12 +244,10 @@ pub(crate) fn compute_matches_data_routes<'a>( pub(crate) fn update_matches_data_routes<'a>(tables: &'a mut Tables, res: &'a mut Arc) { if res.context.is_some() { update_data_routes(tables, res); - update_matching_pulls(tables, res); for match_ in &res.context().matches { let mut match_ = match_.upgrade().unwrap(); if !Arc::ptr_eq(&match_, res) { update_data_routes(tables, &mut match_); - update_matching_pulls(tables, &mut match_); } } } @@ -278,9 +262,6 @@ pub(crate) fn disable_matches_data_routes(_tables: &mut Tables, res: &mut Arc Arc { - let mut pull_caches = PullCaches::default(); - compute_matching_pulls_(tables, &mut pull_caches, expr); - Arc::new(pull_caches) -} - -pub(crate) fn update_matching_pulls(tables: &Tables, res: &mut Arc) { - if res.context.is_some() { - let mut res_mut = res.clone(); - let res_mut = get_mut_unchecked(&mut res_mut); - if res_mut.context_mut().matching_pulls.is_none() { - res_mut.context_mut().matching_pulls = Some(Arc::new(PullCaches::default())); - } - compute_matching_pulls_( - tables, - get_mut_unchecked(res_mut.context_mut().matching_pulls.as_mut().unwrap()), - &mut RoutingExpr::new(res, ""), - ); - } -} - -#[inline] -fn get_matching_pulls( - tables: &Tables, - res: &Option>, - expr: &mut RoutingExpr, -) -> Arc { - res.as_ref() - .and_then(|res| res.context.as_ref()) - .and_then(|ctx| ctx.matching_pulls.clone()) - .unwrap_or_else(|| compute_matching_pulls(tables, expr)) -} - -macro_rules! cache_data { - ( - $matching_pulls:expr, - $expr:expr, - $payload:expr - ) => { - for context in $matching_pulls.iter() { - get_mut_unchecked(&mut context.clone()) - .last_values - .insert($expr.full_expr().to_string(), $payload.clone()); - } - }; -} - #[cfg(feature = "stats")] macro_rules! inc_stats { ( @@ -497,12 +406,10 @@ pub fn full_reentrant_route_data( let route = get_data_route(&tables, face, &res, &mut expr, routing_context); - let matching_pulls = get_matching_pulls(&tables, &res, &mut expr); - - if !(route.is_empty() && matching_pulls.is_empty()) { + if !route.is_empty() { treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp); - if route.len() == 1 && matching_pulls.len() == 0 { + if route.len() == 1 { let (outface, key_expr, context) = route.values().next().unwrap(); if tables .hat_code @@ -524,26 +431,43 @@ pub fn full_reentrant_route_data( payload, }) } - } else { - if !matching_pulls.is_empty() { - let lock = zlock!(tables.pull_caches_lock); - cache_data!(matching_pulls, expr, payload); - drop(lock); - } + } else if tables.whatami == WhatAmI::Router { + let route = route + .values() + .filter(|(outface, _key_expr, _context)| { + tables + .hat_code + .egress_filter(&tables, face, outface, &mut expr) + }) + .cloned() + .collect::>(); - if tables.whatami == WhatAmI::Router { - let route = route - .values() - .filter(|(outface, _key_expr, _context)| { - tables - .hat_code - .egress_filter(&tables, face, outface, &mut expr) - }) - .cloned() - .collect::>(); + drop(tables); + for (outface, key_expr, context) in route { + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, payload) + } else { + inc_stats!(face, tx, admin, payload) + } - drop(tables); - for (outface, key_expr, context) in route { + outface.primitives.send_push(Push { + wire_expr: key_expr, + ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: context }, + payload: payload.clone(), + }) + } + } else { + drop(tables); + for (outface, key_expr, context) in route.values() { + if face.id != outface.id + && match (face.mcast_group.as_ref(), outface.mcast_group.as_ref()) { + (Some(l), Some(r)) => l != r, + _ => true, + } + { #[cfg(feature = "stats")] if !admin { inc_stats!(face, tx, user, payload) @@ -552,41 +476,13 @@ pub fn full_reentrant_route_data( } outface.primitives.send_push(Push { - wire_expr: key_expr, + wire_expr: key_expr.into(), ext_qos, ext_tstamp: None, - ext_nodeid: ext::NodeIdType { node_id: context }, + ext_nodeid: ext::NodeIdType { node_id: *context }, payload: payload.clone(), }) } - } else { - drop(tables); - for (outface, key_expr, context) in route.values() { - if face.id != outface.id - && match ( - face.mcast_group.as_ref(), - outface.mcast_group.as_ref(), - ) { - (Some(l), Some(r)) => l != r, - _ => true, - } - { - #[cfg(feature = "stats")] - if !admin { - inc_stats!(face, tx, user, payload) - } else { - inc_stats!(face, tx, admin, payload) - } - - outface.primitives.send_push(Push { - wire_expr: key_expr.into(), - ext_qos, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType { node_id: *context }, - payload: payload.clone(), - }) - } - } } } } @@ -597,68 +493,3 @@ pub fn full_reentrant_route_data( } } } - -pub fn pull_data(tables_ref: &RwLock, face: &Arc, expr: WireExpr) { - let tables = zread!(tables_ref); - match tables.get_mapping(face, &expr.scope, expr.mapping) { - Some(prefix) => match Resource::get_resource(prefix, expr.suffix.as_ref()) { - Some(mut res) => { - let res = get_mut_unchecked(&mut res); - match res.session_ctxs.get_mut(&face.id) { - Some(ctx) => match &ctx.subs { - Some(_subinfo) => { - // let reliability = subinfo.reliability; - let lock = zlock!(tables.pull_caches_lock); - let route = get_mut_unchecked(ctx) - .last_values - .drain() - .map(|(name, sample)| { - ( - Resource::get_best_key(&tables.root_res, &name, face.id) - .to_owned(), - sample, - ) - }) - .collect::>(); - drop(lock); - drop(tables); - for (key_expr, payload) in route { - face.primitives.send_push(Push { - wire_expr: key_expr, - ext_qos: ext::QoSType::PUSH, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - payload, - }); - } - } - None => { - log::error!( - "{} Pull data for unknown subscriber {} (no info)!", - face, - prefix.expr() + expr.suffix.as_ref() - ); - } - }, - None => { - log::error!( - "{} Pull data for unknown subscriber {} (no context)!", - face, - prefix.expr() + expr.suffix.as_ref() - ); - } - } - } - None => { - log::error!( - "{} Pull data for unknown subscriber {} (no resource)!", - face, - prefix.expr() + expr.suffix.as_ref() - ); - } - }, - None => { - log::error!("{} Pull data with unknown scope {}!", face, expr.scope); - } - }; -} diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 721a98b8c2..04262e555d 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -460,20 +460,12 @@ macro_rules! inc_req_stats { if let Some(stats) = $face.stats.as_ref() { use zenoh_buffers::buffer::Buffer; match &$body { - RequestBody::Put(p) => { - stats.[<$txrx _z_put_msgs>].[](1); - stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); - } - RequestBody::Del(_) => { - stats.[<$txrx _z_del_msgs>].[](1); - } RequestBody::Query(q) => { stats.[<$txrx _z_query_msgs>].[](1); stats.[<$txrx _z_query_pl_bytes>].[]( q.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0), ); } - RequestBody::Pull(_) => (), } } } @@ -492,14 +484,6 @@ macro_rules! inc_res_stats { if let Some(stats) = $face.stats.as_ref() { use zenoh_buffers::buffer::Buffer; match &$body { - ResponseBody::Put(p) => { - stats.[<$txrx _z_put_msgs>].[](1); - let mut n = p.payload.len(); - if let Some(a) = p.ext_attachment.as_ref() { - n += a.buffer.len(); - } - stats.[<$txrx _z_put_pl_bytes>].[](n); - } ResponseBody::Reply(r) => { stats.[<$txrx _z_reply_msgs>].[](1); let mut n = 0; diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 9f43841025..3e35db14b6 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -24,7 +24,6 @@ use zenoh_config::WhatAmI; #[cfg(feature = "complete_n")] use zenoh_protocol::network::request::ext::TargetType; use zenoh_protocol::network::RequestId; -use zenoh_protocol::zenoh::PushBody; use zenoh_protocol::{ core::{key_expr::keyexpr, ExprId, WireExpr}, network::{ @@ -51,7 +50,6 @@ pub(crate) struct QueryTargetQabl { pub(crate) distance: f64, } pub(crate) type QueryTargetQablSet = Vec; -pub(crate) type PullCaches = Vec>; pub(crate) struct SessionContext { pub(crate) face: Arc, @@ -59,7 +57,6 @@ pub(crate) struct SessionContext { pub(crate) remote_expr_id: Option, pub(crate) subs: Option, pub(crate) qabl: Option, - pub(crate) last_values: HashMap, pub(crate) in_interceptor_cache: Option>, pub(crate) e_interceptor_cache: Option>, } @@ -121,7 +118,6 @@ impl QueryRoutes { pub(crate) struct ResourceContext { pub(crate) matches: Vec>, - pub(crate) matching_pulls: Option>, pub(crate) hat: Box, pub(crate) valid_data_routes: bool, pub(crate) data_routes: DataRoutes, @@ -133,7 +129,6 @@ impl ResourceContext { fn new(hat: Box) -> ResourceContext { ResourceContext { matches: Vec::new(), - matching_pulls: None, hat, valid_data_routes: false, data_routes: DataRoutes::default(), @@ -159,14 +154,6 @@ impl ResourceContext { pub(crate) fn disable_query_routes(&mut self) { self.valid_query_routes = false; } - - pub(crate) fn update_matching_pulls(&mut self, pulls: Arc) { - self.matching_pulls = Some(pulls); - } - - pub(crate) fn disable_matching_pulls(&mut self) { - self.matching_pulls = None; - } } pub struct Resource { @@ -445,7 +432,6 @@ impl Resource { remote_expr_id: None, subs: None, qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }) @@ -708,7 +694,6 @@ pub fn register_expr( remote_expr_id: Some(expr_id), subs: None, qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }) diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index e239a316a1..4f2fc2ee83 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -71,7 +71,6 @@ pub struct Tables { pub(crate) mcast_groups: Vec>, pub(crate) mcast_faces: Vec>, pub(crate) interceptors: Vec, - pub(crate) pull_caches_lock: Mutex<()>, pub(crate) hat: Box, pub(crate) hat_code: Arc, // @TODO make this a Box } @@ -103,7 +102,6 @@ impl Tables { mcast_groups: vec![], mcast_faces: vec![], interceptors: interceptor_factories(config)?, - pull_caches_lock: Mutex::new(()), hat: hat_code.new_tables(router_peers_failover_brokering), hat_code: hat_code.into(), }) diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 05210bcaee..a9908f5f58 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -20,9 +20,7 @@ use crate::{ net::routing::{ dispatcher::face::Face, - router::{ - compute_data_routes, compute_matching_pulls, compute_query_routes, RoutesIndexes, - }, + router::{compute_data_routes, compute_query_routes, RoutesIndexes}, }, runtime::Runtime, }; @@ -192,11 +190,7 @@ impl HatBaseTrait for HatCode { let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes.push(( - _match.clone(), - compute_data_routes(&rtables, &mut expr), - compute_matching_pulls(&rtables, &mut expr), - )); + matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr))); } for _match in qabls_matches.drain(..) { matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match))); @@ -204,13 +198,10 @@ impl HatBaseTrait for HatCode { drop(rtables); let mut wtables = zwrite!(tables.tables); - for (mut res, data_routes, matching_pulls) in matches_data_routes { + for (mut res, data_routes) in matches_data_routes { get_mut_unchecked(&mut res) .context_mut() .update_data_routes(data_routes); - get_mut_unchecked(&mut res) - .context_mut() - .update_matching_pulls(matching_pulls); Resource::clean(&mut res); } for (mut res, query_routes) in matches_query_routes { diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index f9f827ecc5..290f90f95f 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -30,7 +30,7 @@ use zenoh_protocol::{ core::{Reliability, WhatAmI}, network::declare::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, Mode, UndeclareSubscriber, + DeclareSubscriber, UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -94,16 +94,11 @@ fn register_client_subscription( { let res = get_mut_unchecked(res); match res.session_ctxs.get_mut(&face.id) { - Some(ctx) => match &ctx.subs { - Some(info) => { - if Mode::Pull == info.mode { - get_mut_unchecked(ctx).subs = Some(*sub_info); - } - } - None => { + Some(ctx) => { + if ctx.subs.is_none() { get_mut_unchecked(ctx).subs = Some(*sub_info); } - }, + } None => { res.session_ctxs.insert( face.id, @@ -113,7 +108,6 @@ fn register_client_subscription( remote_expr_id: None, subs: Some(*sub_info), qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }), @@ -132,10 +126,8 @@ fn declare_client_subscription( sub_info: &SubscriberInfo, ) { register_client_subscription(tables, face, id, res, sub_info); - let mut propa_sub_info = *sub_info; - propa_sub_info.mode = Mode::Push; - propagate_simple_subscription(tables, res, &propa_sub_info, face); + propagate_simple_subscription(tables, res, sub_info, face); // This introduced a buffer overflow on windows // @TODO: Let's deactivate this on windows until Fixed #[cfg(not(windows))] @@ -243,7 +235,6 @@ fn forget_client_subscription( pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - mode: Mode::Push, }; for src_face in tables .faces @@ -327,20 +318,19 @@ impl HatPubSubTrait for HatCode { let mres = mres.upgrade().unwrap(); for (sid, context) in &mres.session_ctxs { - if let Some(subinfo) = &context.subs { - if match tables.whatami { + if context.subs.is_some() + && match tables.whatami { WhatAmI::Router => context.face.whatami != WhatAmI::Router, _ => { source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client } - } && subinfo.mode == Mode::Push - { - route.entry(*sid).or_insert_with(|| { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); - (context.face.clone(), key_expr.to_owned(), NodeId::default()) - }); } + { + route.entry(*sid).or_insert_with(|| { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); + (context.face.clone(), key_expr.to_owned(), NodeId::default()) + }); } } } diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 4964a8880a..81e5ba52d9 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -22,7 +22,7 @@ use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; use std::borrow::Cow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::Arc; use zenoh_buffers::ZBuf; @@ -133,7 +133,6 @@ fn register_client_queryable( remote_expr_id: None, subs: None, qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 5591ea3b3e..3c4e2091f0 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -36,9 +36,7 @@ use crate::{ routing::{ dispatcher::face::Face, hat::TREES_COMPUTATION_DELAY_MS, - router::{ - compute_data_routes, compute_matching_pulls, compute_query_routes, RoutesIndexes, - }, + router::{compute_data_routes, compute_query_routes, RoutesIndexes}, }, }, runtime::Runtime, @@ -311,11 +309,7 @@ impl HatBaseTrait for HatCode { let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes.push(( - _match.clone(), - compute_data_routes(&rtables, &mut expr), - compute_matching_pulls(&rtables, &mut expr), - )); + matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr))); } for _match in qabls_matches.drain(..) { matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match))); @@ -323,13 +317,10 @@ impl HatBaseTrait for HatCode { drop(rtables); let mut wtables = zwrite!(tables.tables); - for (mut res, data_routes, matching_pulls) in matches_data_routes { + for (mut res, data_routes) in matches_data_routes { get_mut_unchecked(&mut res) .context_mut() .update_data_routes(data_routes); - get_mut_unchecked(&mut res) - .context_mut() - .update_matching_pulls(matching_pulls); Resource::clean(&mut res); } for (mut res, query_routes) in matches_query_routes { diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 9a41915333..dddb6ae366 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ core::{Reliability, WhatAmI, ZenohId}, network::declare::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, Mode, UndeclareSubscriber, + DeclareSubscriber, UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -207,16 +207,11 @@ fn register_client_subscription( { let res = get_mut_unchecked(res); match res.session_ctxs.get_mut(&face.id) { - Some(ctx) => match &ctx.subs { - Some(info) => { - if Mode::Pull == info.mode { - get_mut_unchecked(ctx).subs = Some(*sub_info); - } - } - None => { + Some(ctx) => { + if ctx.subs.is_none() { get_mut_unchecked(ctx).subs = Some(*sub_info); } - }, + } None => { res.session_ctxs.insert( face.id, @@ -226,7 +221,6 @@ fn register_client_subscription( remote_expr_id: None, subs: Some(*sub_info), qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }), @@ -245,10 +239,8 @@ fn declare_client_subscription( sub_info: &SubscriberInfo, ) { register_client_subscription(tables, face, id, res, sub_info); - let mut propa_sub_info = *sub_info; - propa_sub_info.mode = Mode::Push; let zid = tables.zid; - register_peer_subscription(tables, face, res, &propa_sub_info, zid); + register_peer_subscription(tables, face, res, sub_info, zid); } #[inline] @@ -454,7 +446,6 @@ fn forget_client_subscription( pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - mode: Mode::Push, }; if face.whatami == WhatAmI::Client { @@ -511,7 +502,6 @@ pub(super) fn pubsub_tree_change(tables: &mut Tables, new_childs: &[Vec context.face.whatami != WhatAmI::Router, _ => { source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client } - } && subinfo.mode == Mode::Push - { - route.entry(*sid).or_insert_with(|| { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); - (context.face.clone(), key_expr.to_owned(), NodeId::default()) - }); } + { + route.entry(*sid).or_insert_with(|| { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); + (context.face.clone(), key_expr.to_owned(), NodeId::default()) + }); } } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 51aac2175a..fa553e5121 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -285,7 +285,6 @@ fn register_client_queryable( remote_expr_id: None, subs: None, qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }) diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 1a6c1ba407..59b39d4284 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -23,9 +23,7 @@ use crate::{ protocol::linkstate::LinkStateList, routing::{ dispatcher::face::Face, - router::{ - compute_data_routes, compute_matching_pulls, compute_query_routes, RoutesIndexes, - }, + router::{compute_data_routes, compute_query_routes, RoutesIndexes}, }, }, runtime::Runtime, @@ -241,11 +239,7 @@ impl HatBaseTrait for HatCode { let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes.push(( - _match.clone(), - compute_data_routes(&rtables, &mut expr), - compute_matching_pulls(&rtables, &mut expr), - )); + matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr))); } for _match in qabls_matches.drain(..) { matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match))); @@ -253,13 +247,10 @@ impl HatBaseTrait for HatCode { drop(rtables); let mut wtables = zwrite!(tables.tables); - for (mut res, data_routes, matching_pulls) in matches_data_routes { + for (mut res, data_routes) in matches_data_routes { get_mut_unchecked(&mut res) .context_mut() .update_data_routes(data_routes); - get_mut_unchecked(&mut res) - .context_mut() - .update_matching_pulls(matching_pulls); Resource::clean(&mut res); } for (mut res, query_routes) in matches_query_routes { diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 4f6ce5aeca..a722176292 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -30,7 +30,7 @@ use zenoh_protocol::{ core::{Reliability, WhatAmI}, network::declare::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, Mode, UndeclareSubscriber, + DeclareSubscriber, UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -94,16 +94,11 @@ fn register_client_subscription( { let res = get_mut_unchecked(res); match res.session_ctxs.get_mut(&face.id) { - Some(ctx) => match &ctx.subs { - Some(info) => { - if Mode::Pull == info.mode { - get_mut_unchecked(ctx).subs = Some(*sub_info); - } - } - None => { + Some(ctx) => { + if ctx.subs.is_none() { get_mut_unchecked(ctx).subs = Some(*sub_info); } - }, + } None => { res.session_ctxs.insert( face.id, @@ -113,7 +108,6 @@ fn register_client_subscription( remote_expr_id: None, subs: Some(*sub_info), qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }), @@ -132,10 +126,8 @@ fn declare_client_subscription( sub_info: &SubscriberInfo, ) { register_client_subscription(tables, face, id, res, sub_info); - let mut propa_sub_info = *sub_info; - propa_sub_info.mode = Mode::Push; - propagate_simple_subscription(tables, res, &propa_sub_info, face); + propagate_simple_subscription(tables, res, sub_info, face); // This introduced a buffer overflow on windows // TODO: Let's deactivate this on windows until Fixed #[cfg(not(windows))] @@ -243,7 +235,6 @@ fn forget_client_subscription( pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - mode: Mode::Push, }; for src_face in tables .faces @@ -327,20 +318,19 @@ impl HatPubSubTrait for HatCode { let mres = mres.upgrade().unwrap(); for (sid, context) in &mres.session_ctxs { - if let Some(subinfo) = &context.subs { - if match tables.whatami { + if context.subs.is_some() + && match tables.whatami { WhatAmI::Router => context.face.whatami != WhatAmI::Router, _ => { source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client } - } && subinfo.mode == Mode::Push - { - route.entry(*sid).or_insert_with(|| { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); - (context.face.clone(), key_expr.to_owned(), NodeId::default()) - }); } + { + route.entry(*sid).or_insert_with(|| { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); + (context.face.clone(), key_expr.to_owned(), NodeId::default()) + }); } } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 04b31b41ef..caea6fe6b8 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -22,7 +22,7 @@ use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; use std::borrow::Cow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::Arc; use zenoh_buffers::ZBuf; @@ -133,7 +133,6 @@ fn register_client_queryable( remote_expr_id: None, subs: None, qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }) diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index ff576ae271..47cf02db46 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -40,9 +40,7 @@ use crate::{ routing::{ dispatcher::face::Face, hat::TREES_COMPUTATION_DELAY_MS, - router::{ - compute_data_routes, compute_matching_pulls, compute_query_routes, RoutesIndexes, - }, + router::{compute_data_routes, compute_query_routes, RoutesIndexes}, }, }, runtime::Runtime, @@ -480,11 +478,7 @@ impl HatBaseTrait for HatCode { let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes.push(( - _match.clone(), - compute_data_routes(&rtables, &mut expr), - compute_matching_pulls(&rtables, &mut expr), - )); + matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr))); } for _match in qabls_matches.drain(..) { matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match))); @@ -492,13 +486,10 @@ impl HatBaseTrait for HatCode { drop(rtables); let mut wtables = zwrite!(tables.tables); - for (mut res, data_routes, matching_pulls) in matches_data_routes { + for (mut res, data_routes) in matches_data_routes { get_mut_unchecked(&mut res) .context_mut() .update_data_routes(data_routes); - get_mut_unchecked(&mut res) - .context_mut() - .update_matching_pulls(matching_pulls); Resource::clean(&mut res); } for (mut res, query_routes) in matches_query_routes { diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index da1ca66efd..93c4cb7002 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ core::{Reliability, WhatAmI, ZenohId}, network::declare::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, Mode, UndeclareSubscriber, + DeclareSubscriber, UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -243,8 +243,7 @@ fn declare_peer_subscription( peer: ZenohId, ) { register_peer_subscription(tables, face, res, sub_info, peer); - let mut propa_sub_info = *sub_info; - propa_sub_info.mode = Mode::Push; + let propa_sub_info = *sub_info; let zid = tables.zid; register_router_subscription(tables, face, res, &propa_sub_info, zid); } @@ -260,16 +259,11 @@ fn register_client_subscription( { let res = get_mut_unchecked(res); match res.session_ctxs.get_mut(&face.id) { - Some(ctx) => match &ctx.subs { - Some(info) => { - if Mode::Pull == info.mode { - get_mut_unchecked(ctx).subs = Some(*sub_info); - } - } - None => { + Some(ctx) => { + if ctx.subs.is_none() { get_mut_unchecked(ctx).subs = Some(*sub_info); } - }, + } None => { res.session_ctxs.insert( face.id, @@ -279,7 +273,6 @@ fn register_client_subscription( remote_expr_id: None, subs: Some(*sub_info), qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }), @@ -298,10 +291,8 @@ fn declare_client_subscription( sub_info: &SubscriberInfo, ) { register_client_subscription(tables, face, id, res, sub_info); - let mut propa_sub_info = *sub_info; - propa_sub_info.mode = Mode::Push; let zid = tables.zid; - register_router_subscription(tables, face, res, &propa_sub_info, zid); + register_router_subscription(tables, face, res, sub_info, zid); } #[inline] @@ -600,7 +591,6 @@ fn forget_client_subscription( pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - mode: Mode::Push, }; if face.whatami == WhatAmI::Client { @@ -720,7 +710,6 @@ pub(super) fn pubsub_tree_change( if *sub == tree_id { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - mode: Mode::Push, }; send_sourced_subscription_to_net_childs( tables, @@ -799,7 +788,6 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: let key_expr = Resource::decl_key(res, dst_face); let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - mode: Mode::Push, }; dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -1003,14 +991,11 @@ impl HatPubSubTrait for HatCode { if master || source_type == WhatAmI::Router { for (sid, context) in &mres.session_ctxs { - if let Some(subinfo) = &context.subs { - if context.face.whatami != WhatAmI::Router && subinfo.mode == Mode::Push { - route.entry(*sid).or_insert_with(|| { - let key_expr = - Resource::get_best_key(expr.prefix, expr.suffix, *sid); - (context.face.clone(), key_expr.to_owned(), NodeId::default()) - }); - } + if context.subs.is_some() && context.face.whatami != WhatAmI::Router { + route.entry(*sid).or_insert_with(|| { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); + (context.face.clone(), key_expr.to_owned(), NodeId::default()) + }); } } } diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index b76f0adcc6..aca6f71b3e 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -413,7 +413,6 @@ fn register_client_queryable( remote_expr_id: None, subs: None, qabl: None, - last_values: HashMap::new(), in_interceptor_cache: None, e_interceptor_cache: None, }) diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index b67692e704..29106cb89d 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -388,58 +388,60 @@ impl Primitives for AdminSpace { fn send_request(&self, msg: Request) { trace!("recv Request {:?}", msg); - if let RequestBody::Query(query) = msg.payload { - let primitives = zlock!(self.primitives).as_ref().unwrap().clone(); - { - let conf = self.context.runtime.state.config.lock(); - if !conf.adminspace.permissions().read { - log::error!( + match msg.payload { + RequestBody::Query(query) => { + let primitives = zlock!(self.primitives).as_ref().unwrap().clone(); + { + let conf = self.context.runtime.state.config.lock(); + if !conf.adminspace.permissions().read { + log::error!( "Received GET on '{}' but adminspace.permissions.read=false in configuration", msg.wire_expr ); - primitives.send_response_final(ResponseFinal { - rid: msg.id, - ext_qos: ext::QoSType::RESPONSE_FINAL, - ext_tstamp: None, - }); - return; - } - } - - let key_expr = match self.key_expr_to_string(&msg.wire_expr) { - Ok(key_expr) => key_expr.into_owned(), - Err(e) => { - log::error!("Unknown KeyExpr: {}", e); - primitives.send_response_final(ResponseFinal { - rid: msg.id, - ext_qos: ext::QoSType::RESPONSE_FINAL, - ext_tstamp: None, - }); - return; + primitives.send_response_final(ResponseFinal { + rid: msg.id, + ext_qos: ext::QoSType::RESPONSE_FINAL, + ext_tstamp: None, + }); + return; + } } - }; - - let zid = self.zid; - let parameters = query.parameters.to_owned(); - let query = Query { - inner: Arc::new(QueryInner { - key_expr: key_expr.clone(), - parameters, - value: query - .ext_body - .map(|b| Value::from(b.payload).with_encoding(b.encoding)), - qid: msg.id, - zid, - primitives, - #[cfg(feature = "unstable")] - attachment: query.ext_attachment.map(Into::into), - }), - eid: self.queryable_id, - }; - for (key, handler) in &self.handlers { - if key_expr.intersects(key) { - handler(&self.context, query.clone()); + let key_expr = match self.key_expr_to_string(&msg.wire_expr) { + Ok(key_expr) => key_expr.into_owned(), + Err(e) => { + log::error!("Unknown KeyExpr: {}", e); + primitives.send_response_final(ResponseFinal { + rid: msg.id, + ext_qos: ext::QoSType::RESPONSE_FINAL, + ext_tstamp: None, + }); + return; + } + }; + + let zid = self.zid; + let parameters = query.parameters.to_owned(); + let query = Query { + inner: Arc::new(QueryInner { + key_expr: key_expr.clone(), + parameters, + value: query + .ext_body + .map(|b| Value::from(b.payload).with_encoding(b.encoding)), + qid: msg.id, + zid, + primitives, + #[cfg(feature = "unstable")] + attachment: query.ext_attachment.map(Into::into), + }), + eid: self.queryable_id, + }; + + for (key, handler) in &self.handlers { + if key_expr.intersects(key) { + handler(&self.context, query.clone()); + } } } } diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 4560eefaae..516bcd0109 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -26,7 +26,6 @@ use zenoh_protocol::core::{ key_expr::keyexpr, ExprId, Reliability, WhatAmI, WireExpr, ZenohId, EMPTY_EXPR_ID, }; use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; -use zenoh_protocol::network::declare::Mode; use zenoh_protocol::network::{ext, Declare, DeclareBody, DeclareKeyExpr}; use zenoh_protocol::zenoh::{PushBody, Put}; @@ -59,7 +58,6 @@ fn base_test() { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, - mode: Mode::Push, }; declare_subscription( @@ -186,7 +184,6 @@ fn multisub_test() { // -------------- let sub_info = SubscriberInfo { reliability: Reliability::Reliable, - mode: Mode::Push, }; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), @@ -305,7 +302,6 @@ fn clean_test() { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, - mode: Mode::Push, }; declare_subscription( @@ -570,7 +566,6 @@ fn client_test() { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, - mode: Mode::Push, }; let primitives0 = Arc::new(ClientPrimitives::new()); diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 5e706a0da8..496c6879ce 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -81,7 +81,7 @@ use zenoh_protocol::{ }, zenoh::{ query::{self, ext::QueryBodyType, Consolidation}, - Pull, PushBody, RequestBody, ResponseBody, + PushBody, RequestBody, ResponseBody, }, }; use zenoh_result::ZResult; @@ -294,7 +294,7 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { fn declare_subscriber<'b, TryIntoKeyExpr>( &'s self, key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> + ) -> SubscriberBuilder<'a, 'b, DefaultHandler> where TryIntoKeyExpr: TryInto>, >>::Error: Into, @@ -303,7 +303,6 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { session: self.clone(), key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), reliability: Reliability::DEFAULT, - mode: PushMode, origin: Locality::default(), handler: DefaultHandler, } @@ -578,7 +577,7 @@ impl<'a> SessionDeclarations<'a, 'a> for Session { fn declare_subscriber<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> + ) -> SubscriberBuilder<'a, 'b, DefaultHandler> where TryIntoKeyExpr: TryInto>, >>::Error: Into, @@ -1556,29 +1555,6 @@ impl Session { } } - pub(crate) fn pull<'a>(&'a self, key_expr: &'a KeyExpr) -> impl Resolve> + 'a { - ResolveClosure::new(move || { - trace!("pull({:?})", key_expr); - let state = zread!(self.state); - let primitives = state.primitives.as_ref().unwrap().clone(); - drop(state); - primitives.send_request(Request { - id: 0, // @TODO compute a proper request ID - wire_expr: key_expr.to_wire(self).to_owned(), - ext_qos: ext::QoSType::REQUEST, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - ext_target: request::ext::TargetType::DEFAULT, - ext_budget: None, - ext_timeout: None, - payload: RequestBody::Pull(Pull { - ext_unknown: vec![], - }), - }); - Ok(()) - }) - } - #[allow(clippy::too_many_arguments)] pub(crate) fn query( &self, @@ -1819,7 +1795,7 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { fn declare_subscriber<'b, TryIntoKeyExpr>( &'s self, key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'static, 'b, PushMode, DefaultHandler> + ) -> SubscriberBuilder<'static, 'b, DefaultHandler> where TryIntoKeyExpr: TryInto>, >>::Error: Into, @@ -1828,7 +1804,6 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { session: SessionRef::Shared(self.clone()), key_expr: key_expr.try_into().map_err(Into::into), reliability: Reliability::DEFAULT, - mode: PushMode, origin: Locality::default(), handler: DefaultHandler, } @@ -2110,20 +2085,12 @@ impl Primitives for Session { #[cfg(feature = "unstable")] m.ext_attachment.map(Into::into), ), - RequestBody::Put(_) => (), - RequestBody::Del(_) => (), - RequestBody::Pull(_) => todo!(), } } fn send_response(&self, msg: Response) { trace!("recv Response {:?}", msg); match msg.payload { - ResponseBody::Put(_) => { - log::warn!( - "Received a ResponseBody::Put, but this isn't supported yet. Dropping message." - ) - } ResponseBody::Err(e) => { let mut state = zwrite!(self.state); match state.queries.get_mut(&msg.rid) { @@ -2453,7 +2420,7 @@ pub trait SessionDeclarations<'s, 'a> { fn declare_subscriber<'b, TryIntoKeyExpr>( &'s self, key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> + ) -> SubscriberBuilder<'a, 'b, DefaultHandler> where TryIntoKeyExpr: TryInto>, >>::Error: Into; diff --git a/zenoh/src/subscriber.rs b/zenoh/src/subscriber.rs index 413c9201f2..4488140610 100644 --- a/zenoh/src/subscriber.rs +++ b/zenoh/src/subscriber.rs @@ -24,10 +24,10 @@ use std::fmt; use std::future::Ready; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use zenoh_core::{AsyncResolve, Resolvable, Resolve, SyncResolve}; +use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; #[cfg(feature = "unstable")] use zenoh_protocol::core::EntityGlobalId; -use zenoh_protocol::network::declare::{subscriber::ext::SubscriberInfo, Mode}; +use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; /// The kind of reliability. pub use zenoh_protocol::core::Reliability; @@ -80,90 +80,6 @@ pub(crate) struct SubscriberInner<'a> { pub(crate) alive: bool, } -/// A [`PullMode`] subscriber that provides data through a callback. -/// -/// CallbackPullSubscribers only provide data when explicitely pulled by the -/// application with the [`pull`](CallbackPullSubscriber::pull) function. -/// CallbackPullSubscribers can be created from a zenoh [`Session`](crate::Session) -/// with the [`declare_subscriber`](crate::SessionDeclarations::declare_subscriber) function, -/// the [`callback`](SubscriberBuilder::callback) function -/// and the [`pull_mode`](SubscriberBuilder::pull_mode) function -/// of the resulting builder. -/// -/// Subscribers are automatically undeclared when dropped. -/// -/// # Examples -/// ``` -/// # async_std::task::block_on(async { -/// use zenoh::prelude::r#async::*; -/// -/// let session = zenoh::open(config::peer()).res().await.unwrap(); -/// let subscriber = session -/// .declare_subscriber("key/expression") -/// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) -/// .pull_mode() -/// .res() -/// .await -/// .unwrap(); -/// subscriber.pull(); -/// # }) -/// ``` -pub(crate) struct PullSubscriberInner<'a> { - inner: SubscriberInner<'a>, -} - -impl<'a> PullSubscriberInner<'a> { - /// Pull available data for a [`CallbackPullSubscriber`]. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let subscriber = session - /// .declare_subscriber("key/expression") - /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) - /// .pull_mode() - /// .res() - /// .await - /// .unwrap(); - /// subscriber.pull(); - /// # }) - /// ``` - #[inline] - pub fn pull(&self) -> impl Resolve> + '_ { - self.inner.session.pull(&self.inner.state.key_expr) - } - - /// Close a [`CallbackPullSubscriber`](CallbackPullSubscriber). - /// - /// `CallbackPullSubscribers` are automatically closed when dropped, but you may want to use this function to handle errors or - /// close the `CallbackPullSubscriber` asynchronously. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// # fn data_handler(_sample: Sample) { }; - /// let subscriber = session - /// .declare_subscriber("key/expression") - /// .callback(data_handler) - /// .pull_mode() - /// .res() - /// .await - /// .unwrap(); - /// subscriber.undeclare().res().await.unwrap(); - /// # }) - /// ``` - #[inline] - pub fn undeclare(self) -> impl Resolve> + 'a { - Undeclarable::undeclare_inner(self.inner, ()) - } -} - impl<'a> SubscriberInner<'a> { /// Close a [`CallbackSubscriber`](CallbackSubscriber). /// @@ -248,28 +164,6 @@ impl Drop for SubscriberInner<'_> { } } -/// The mode for pull subscribers. -#[non_exhaustive] -#[derive(Debug, Clone, Copy)] -pub struct PullMode; - -impl From for Mode { - fn from(_: PullMode) -> Self { - Mode::Pull - } -} - -/// The mode for push subscribers. -#[non_exhaustive] -#[derive(Debug, Clone, Copy)] -pub struct PushMode; - -impl From for Mode { - fn from(_: PushMode) -> Self { - Mode::Push - } -} - /// A builder for initializing a [`FlumeSubscriber`]. /// /// # Examples @@ -281,7 +175,6 @@ impl From for Mode { /// let subscriber = session /// .declare_subscriber("key/expression") /// .best_effort() -/// .pull_mode() /// .res() /// .await /// .unwrap(); @@ -289,7 +182,7 @@ impl From for Mode { /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug)] -pub struct SubscriberBuilder<'a, 'b, Mode, Handler> { +pub struct SubscriberBuilder<'a, 'b, Handler> { #[cfg(feature = "unstable")] pub session: SessionRef<'a>, #[cfg(not(feature = "unstable"))] @@ -305,8 +198,6 @@ pub struct SubscriberBuilder<'a, 'b, Mode, Handler> { #[cfg(not(feature = "unstable"))] pub(crate) reliability: Reliability, - #[cfg(feature = "unstable")] - pub mode: Mode, #[cfg(not(feature = "unstable"))] pub(crate) mode: Mode, @@ -321,7 +212,7 @@ pub struct SubscriberBuilder<'a, 'b, Mode, Handler> { pub(crate) handler: Handler, } -impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { +impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { /// Receive the samples for this subscription with a callback. /// /// # Examples @@ -339,7 +230,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { /// # }) /// ``` #[inline] - pub fn callback(self, callback: Callback) -> SubscriberBuilder<'a, 'b, Mode, Callback> + pub fn callback(self, callback: Callback) -> SubscriberBuilder<'a, 'b, Callback> where Callback: Fn(Sample) + Send + Sync + 'static, { @@ -347,7 +238,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { session, key_expr, reliability, - mode, + origin, handler: _, } = self; @@ -355,7 +246,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { session, key_expr, reliability, - mode, + origin, handler: callback, } @@ -385,7 +276,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { pub fn callback_mut( self, callback: CallbackMut, - ) -> SubscriberBuilder<'a, 'b, Mode, impl Fn(Sample) + Send + Sync + 'static> + ) -> SubscriberBuilder<'a, 'b, impl Fn(Sample) + Send + Sync + 'static> where CallbackMut: FnMut(Sample) + Send + Sync + 'static, { @@ -412,7 +303,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { /// # }) /// ``` #[inline] - pub fn with(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Mode, Handler> + pub fn with(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Handler> where Handler: crate::prelude::IntoHandler<'static, Sample>, { @@ -420,7 +311,6 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { session, key_expr, reliability, - mode, origin, handler: _, } = self; @@ -428,13 +318,13 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { session, key_expr, reliability, - mode, origin, handler, } } } -impl<'a, 'b, Mode, Handler> SubscriberBuilder<'a, 'b, Mode, Handler> { + +impl<'a, 'b, Handler> SubscriberBuilder<'a, 'b, Handler> { /// Change the subscription reliability. #[inline] pub fn reliability(mut self, reliability: Reliability) -> Self { @@ -464,52 +354,10 @@ impl<'a, 'b, Mode, Handler> SubscriberBuilder<'a, 'b, Mode, Handler> { self.origin = origin; self } - - /// Change the subscription mode to Pull. - #[inline] - pub fn pull_mode(self) -> SubscriberBuilder<'a, 'b, PullMode, Handler> { - let SubscriberBuilder { - session, - key_expr, - reliability, - mode: _, - origin, - handler, - } = self; - SubscriberBuilder { - session, - key_expr, - reliability, - mode: PullMode, - origin, - handler, - } - } - - /// Change the subscription mode to Push. - #[inline] - pub fn push_mode(self) -> SubscriberBuilder<'a, 'b, PushMode, Handler> { - let SubscriberBuilder { - session, - key_expr, - reliability, - mode: _, - origin, - handler, - } = self; - SubscriberBuilder { - session, - key_expr, - reliability, - mode: PushMode, - origin, - handler, - } - } } // Push mode -impl<'a, Handler> Resolvable for SubscriberBuilder<'a, '_, PushMode, Handler> +impl<'a, Handler> Resolvable for SubscriberBuilder<'a, '_, Handler> where Handler: IntoHandler<'static, Sample> + Send, Handler::Handler: Send, @@ -517,7 +365,7 @@ where type To = ZResult>; } -impl<'a, Handler> SyncResolve for SubscriberBuilder<'a, '_, PushMode, Handler> +impl<'a, Handler> SyncResolve for SubscriberBuilder<'a, '_, Handler> where Handler: IntoHandler<'static, Sample> + Send, Handler::Handler: Send, @@ -534,7 +382,6 @@ where callback, &SubscriberInfo { reliability: self.reliability, - mode: self.mode.into(), }, ) .map(|sub_state| Subscriber { @@ -548,61 +395,7 @@ where } } -impl<'a, Handler> AsyncResolve for SubscriberBuilder<'a, '_, PushMode, Handler> -where - Handler: IntoHandler<'static, Sample> + Send, - Handler::Handler: Send, -{ - type Future = Ready; - - fn res_async(self) -> Self::Future { - std::future::ready(self.res_sync()) - } -} - -// Pull mode -impl<'a, Handler> Resolvable for SubscriberBuilder<'a, '_, PullMode, Handler> -where - Handler: IntoHandler<'static, Sample> + Send, - Handler::Handler: Send, -{ - type To = ZResult>; -} - -impl<'a, Handler> SyncResolve for SubscriberBuilder<'a, '_, PullMode, Handler> -where - Handler: IntoHandler<'static, Sample> + Send, - Handler::Handler: Send, -{ - fn res_sync(self) -> ::To { - let key_expr = self.key_expr?; - let session = self.session; - let (callback, receiver) = self.handler.into_handler(); - session - .declare_subscriber_inner( - &key_expr, - &None, - self.origin, - callback, - &SubscriberInfo { - reliability: self.reliability, - mode: self.mode.into(), - }, - ) - .map(|sub_state| PullSubscriber { - subscriber: PullSubscriberInner { - inner: SubscriberInner { - session, - state: sub_state, - alive: true, - }, - }, - receiver, - }) - } -} - -impl<'a, Handler> AsyncResolve for SubscriberBuilder<'a, '_, PullMode, Handler> +impl<'a, Handler> AsyncResolve for SubscriberBuilder<'a, '_, Handler> where Handler: IntoHandler<'static, Sample> + Send, Handler::Handler: Send, @@ -647,102 +440,6 @@ pub struct Subscriber<'a, Receiver> { pub receiver: Receiver, } -/// A [`PullMode`] subscriber that provides data through a [`Handler`](crate::prelude::IntoHandler). -/// -/// PullSubscribers only provide data when explicitely pulled by the -/// application with the [`pull`](PullSubscriber::pull) function. -/// PullSubscribers can be created from a zenoh [`Session`](crate::Session) -/// with the [`declare_subscriber`](crate::SessionDeclarations::declare_subscriber) function, -/// the [`with`](SubscriberBuilder::with) function -/// and the [`pull_mode`](SubscriberBuilder::pull_mode) function -/// of the resulting builder. -/// -/// Subscribers are automatically undeclared when dropped. -/// -/// # Examples -/// ``` -/// # async_std::task::block_on(async { -/// use zenoh::prelude::r#async::*; -/// -/// let session = zenoh::open(config::peer()).res().await.unwrap(); -/// let subscriber = session -/// .declare_subscriber("key/expression") -/// .with(flume::bounded(32)) -/// .pull_mode() -/// .res() -/// .await -/// .unwrap(); -/// subscriber.pull(); -/// # }) -/// ``` -#[non_exhaustive] -pub struct PullSubscriber<'a, Receiver> { - pub(crate) subscriber: PullSubscriberInner<'a>, - pub receiver: Receiver, -} - -impl<'a, Receiver> Deref for PullSubscriber<'a, Receiver> { - type Target = Receiver; - fn deref(&self) -> &Self::Target { - &self.receiver - } -} - -impl<'a, Receiver> DerefMut for PullSubscriber<'a, Receiver> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.receiver - } -} - -impl<'a, Receiver> PullSubscriber<'a, Receiver> { - /// Pull available data for a [`PullSubscriber`]. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let subscriber = session - /// .declare_subscriber("key/expression") - /// .with(flume::bounded(32)) - /// .pull_mode() - /// .res() - /// .await - /// .unwrap(); - /// subscriber.pull(); - /// # }) - /// ``` - #[inline] - pub fn pull(&self) -> impl Resolve> + '_ { - self.subscriber.pull() - } - - /// Close a [`PullSubscriber`]. - /// - /// Subscribers are automatically closed when dropped, but you may want to use this function to handle errors or - /// close the Subscriber asynchronously. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let subscriber = session.declare_subscriber("key/expression") - /// .pull_mode() - /// .res() - /// .await - /// .unwrap(); - /// subscriber.undeclare().res().await.unwrap(); - /// # }) - /// ``` - #[inline] - pub fn undeclare(self) -> impl Resolve> + 'a { - self.subscriber.undeclare() - } -} - impl<'a, Receiver> Subscriber<'a, Receiver> { /// Returns the [`EntityGlobalId`] of this Subscriber. ///