diff --git a/examples/examples/z_local_pub_sub_thr.rs b/examples/examples/z_local_pub_sub_thr.rs new file mode 100644 index 0000000000..666fd89f8e --- /dev/null +++ b/examples/examples/z_local_pub_sub_thr.rs @@ -0,0 +1,155 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{convert::TryInto, time::Instant}; + +use clap::Parser; +use zenoh::{ + bytes::ZBytes, + qos::{CongestionControl, Priority}, + Wait, +}; +use zenoh_examples::CommonArgs; + +struct Stats { + round_count: usize, + round_size: usize, + finished_rounds: usize, + round_start: Instant, + global_start: Option, +} +impl Stats { + fn new(round_size: usize) -> Self { + Stats { + round_count: 0, + round_size, + finished_rounds: 0, + round_start: Instant::now(), + global_start: None, + } + } + fn increment(&mut self) { + if self.round_count == 0 { + self.round_start = Instant::now(); + if self.global_start.is_none() { + self.global_start = Some(self.round_start) + } + self.round_count += 1; + } else if self.round_count < self.round_size { + self.round_count += 1; + } else { + self.print_round(); + self.finished_rounds += 1; + self.round_count = 0; + } + } + fn print_round(&self) { + let elapsed = self.round_start.elapsed().as_secs_f64(); + let throughput = (self.round_size as f64) / elapsed; + println!("{throughput} msg/s"); + } +} +impl Drop for Stats { + fn drop(&mut self) { + let Some(global_start) = self.global_start else { + return; + }; + let elapsed = global_start.elapsed().as_secs_f64(); + let total = self.round_size * self.finished_rounds + self.round_count; + let throughput = total as f64 / elapsed; + println!("Received {total} messages over {elapsed:.2}s: {throughput}msg/s"); + } +} + +fn main() { + // initiate logging + zenoh::init_log_from_env_or("error"); + let args = Args::parse(); + + let session = zenoh::open(args.common).wait().unwrap(); + + let key_expr = "test/thr"; + + let mut stats = Stats::new(args.number); + session + .declare_subscriber(key_expr) + .callback_mut(move |_sample| { + stats.increment(); + if stats.finished_rounds >= args.samples { + std::process::exit(0) + } + }) + .background() + .wait() + .unwrap(); + + let mut prio = Priority::DEFAULT; + if let Some(p) = args.priority { + prio = p.try_into().unwrap(); + } + + let publisher = session + .declare_publisher(key_expr) + .congestion_control(CongestionControl::Block) + .priority(prio) + .express(args.express) + .wait() + .unwrap(); + + println!("Press CTRL-C to quit..."); + let payload_size = args.payload_size; + let data: ZBytes = (0..payload_size) + .map(|i| (i % 10) as u8) + .collect::>() + .into(); + let mut count: usize = 0; + let mut start = std::time::Instant::now(); + loop { + publisher.put(data.clone()).wait().unwrap(); + + if args.print { + if count < args.number { + count += 1; + } else { + let thpt = count as f64 / start.elapsed().as_secs_f64(); + println!("{thpt} msg/s"); + count = 0; + start = std::time::Instant::now(); + } + } + } +} + +#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "10")] + /// Number of throughput measurements. + samples: usize, + /// express for sending data + #[arg(long, default_value = "false")] + express: bool, + /// Priority for sending data + #[arg(short, long)] + priority: Option, + /// Print the statistics + #[arg(short = 't', long)] + print: bool, + /// Number of messages in each throughput measurements + #[arg(short, long, default_value = "10000000")] + number: usize, + /// Sets the size of the payload to publish + payload_size: usize, + #[command(flatten)] + common: CommonArgs, +} diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index f72548bfc2..b2af207859 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -31,8 +31,8 @@ use crate::{ bytes::{OptionZBytes, ZBytes}, encoding::Encoding, key_expr::KeyExpr, - publisher::{Priority, Publisher}, - sample::{Locality, SampleKind}, + publisher::{Priority, Publisher, PublisherCache, PublisherCacheValue}, + sample::Locality, }, Session, }; @@ -209,11 +209,10 @@ impl Wait for PublicationBuilder, PublicationBuilderPut #[inline] fn wait(mut self) -> ::To { self.publisher = self.publisher.apply_qos_overwrites(); - self.publisher.session.0.resolve_put( + self.publisher.session.0.resolve_push( + &mut PublisherCacheValue::default(), &self.publisher.key_expr?, - self.kind.payload, - SampleKind::Put, - self.kind.encoding, + Some(self.kind), self.publisher.congestion_control, self.publisher.priority, self.publisher.is_express, @@ -232,11 +231,10 @@ impl Wait for PublicationBuilder, PublicationBuilderDel #[inline] fn wait(mut self) -> ::To { self.publisher = self.publisher.apply_qos_overwrites(); - self.publisher.session.0.resolve_put( + self.publisher.session.0.resolve_push( + &mut PublisherCacheValue::default(), &self.publisher.key_expr?, - ZBytes::new(), - SampleKind::Delete, - Encoding::ZENOH_BYTES, + None, self.publisher.congestion_control, self.publisher.priority, self.publisher.is_express, @@ -468,6 +466,7 @@ impl Wait for PublisherBuilder<'_, '_> { .declare_publisher_inner(key_expr.clone(), self.destination)?; Ok(Publisher { session: self.session.downgrade(), + cache: PublisherCache::default(), id, key_expr, encoding: self.encoding, @@ -495,43 +494,45 @@ impl IntoFuture for PublisherBuilder<'_, '_> { impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> { fn wait(self) -> ::To { - self.publisher.session.resolve_put( - &self.publisher.key_expr, - self.kind.payload, - SampleKind::Put, - self.kind.encoding, - self.publisher.congestion_control, - self.publisher.priority, - self.publisher.is_express, - self.publisher.destination, - #[cfg(feature = "unstable")] - self.publisher.reliability, - self.timestamp, - #[cfg(feature = "unstable")] - self.source_info, - self.attachment, - ) + self.publisher.cache.with_cache(|cached| { + self.publisher.session.resolve_push( + cached, + &self.publisher.key_expr, + Some(self.kind), + self.publisher.congestion_control, + self.publisher.priority, + self.publisher.is_express, + self.publisher.destination, + #[cfg(feature = "unstable")] + self.publisher.reliability, + self.timestamp, + #[cfg(feature = "unstable")] + self.source_info, + self.attachment, + ) + }) } } impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> { fn wait(self) -> ::To { - self.publisher.session.resolve_put( - &self.publisher.key_expr, - ZBytes::new(), - SampleKind::Delete, - Encoding::ZENOH_BYTES, - self.publisher.congestion_control, - self.publisher.priority, - self.publisher.is_express, - self.publisher.destination, - #[cfg(feature = "unstable")] - self.publisher.reliability, - self.timestamp, - #[cfg(feature = "unstable")] - self.source_info, - self.attachment, - ) + self.publisher.cache.with_cache(|cached| { + self.publisher.session.resolve_push( + cached, + &self.publisher.key_expr, + None, + self.publisher.congestion_control, + self.publisher.priority, + self.publisher.is_express, + self.publisher.destination, + #[cfg(feature = "unstable")] + self.publisher.reliability, + self.timestamp, + #[cfg(feature = "unstable")] + self.source_info, + self.attachment, + ) + }) } } diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index eb0858fe21..c5be4ce459 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -17,6 +17,7 @@ use std::{ fmt, future::{IntoFuture, Ready}, pin::Pin, + sync::atomic::{AtomicU64, Ordering}, task::{Context, Poll}, }; @@ -40,17 +41,20 @@ use { zenoh_protocol::core::Reliability, }; -use crate::api::{ - builders::publisher::{ - PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, - PublisherDeleteBuilder, PublisherPutBuilder, +use crate::{ + api::{ + builders::publisher::{ + PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, + PublisherDeleteBuilder, PublisherPutBuilder, + }, + bytes::ZBytes, + encoding::Encoding, + key_expr::KeyExpr, + sample::{Locality, Sample, SampleFields}, + session::{UndeclarableSealed, WeakSession}, + Id, }, - bytes::ZBytes, - encoding::Encoding, - key_expr::KeyExpr, - sample::{Locality, Sample, SampleFields}, - session::{UndeclarableSealed, WeakSession}, - Id, + sample::SampleKind, }; pub(crate) struct PublisherState { @@ -69,6 +73,80 @@ impl fmt::Debug for PublisherState { } } +#[derive(Default)] +pub(crate) struct PublisherCache(AtomicU64); + +impl PublisherCache { + #[inline(always)] + pub(crate) fn with_cache(&self, f: impl FnOnce(&mut PublisherCacheValue) -> R) -> R { + let cached = self.0.load(Ordering::Relaxed); + let mut to_cache = PublisherCacheValue(cached); + let res = f(&mut to_cache); + if to_cache.0 != cached { + let _ = self.0.compare_exchange_weak( + cached, + to_cache.0, + Ordering::Relaxed, + Ordering::Relaxed, + ); + } + res + } +} + +impl fmt::Debug for PublisherCache { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("PublisherCache") + .field(&PublisherCacheValue(self.0.load(Ordering::Relaxed))) + .finish() + } +} +#[derive(Default, PartialEq, Eq)] +pub(crate) struct PublisherCacheValue(u64); + +impl PublisherCacheValue { + const VERSION_SHIFT: usize = 2; + const NO_REMOTE: u64 = 0b01; + const NO_LOCAL: u64 = 0b10; + + #[inline(always)] + pub(crate) fn match_subscription_version(&mut self, version: u64) { + if self.0 >> Self::VERSION_SHIFT != version { + self.0 = version << Self::VERSION_SHIFT; + } + } + + #[inline(always)] + pub(crate) fn has_remote_sub(&self) -> bool { + self.0 & Self::NO_REMOTE == 0 + } + + #[inline(always)] + pub(crate) fn set_no_remote_sub(&mut self) { + self.0 |= Self::NO_REMOTE; + } + + #[inline(always)] + pub(crate) fn has_local_sub(&self) -> bool { + self.0 & Self::NO_LOCAL == 0 + } + + #[inline(always)] + pub(crate) fn set_no_local_sub(&mut self) { + self.0 |= Self::NO_LOCAL; + } +} + +impl fmt::Debug for PublisherCacheValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PublisherCacheValue") + .field("subscription_version", &(self.0 >> Self::VERSION_SHIFT)) + .field("has_remote_sub", &self.has_remote_sub()) + .field("has_local_sub", &self.has_local_sub()) + .finish() + } +} + /// A publisher that allows to send data through a stream. /// /// Publishers are automatically undeclared when dropped. @@ -101,6 +179,7 @@ impl fmt::Debug for PublisherState { #[derive(Debug)] pub struct Publisher<'a> { pub(crate) session: WeakSession, + pub(crate) cache: PublisherCache, pub(crate) id: Id, pub(crate) key_expr: KeyExpr<'a>, pub(crate) encoding: Encoding, @@ -390,22 +469,14 @@ impl Sink for Publisher<'_> { attachment, .. } = item.into(); - self.session.resolve_put( - &self.key_expr, - payload, - kind, - encoding, - self.congestion_control, - self.priority, - self.is_express, - self.destination, - #[cfg(feature = "unstable")] - self.reliability, - None, - #[cfg(feature = "unstable")] - SourceInfo::empty(), - attachment, - ) + match kind { + SampleKind::Put => self + .put(payload) + .encoding(encoding) + .attachment(attachment) + .wait(), + SampleKind::Delete => self.delete().attachment(attachment).wait(), + } } #[inline] diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index e6455cab23..3df4b07257 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -97,7 +97,7 @@ use crate::{ info::SessionInfo, key_expr::{KeyExpr, KeyExprInner}, liveliness::Liveliness, - publisher::{Priority, PublisherState}, + publisher::{Priority, PublisherCacheValue, PublisherState}, query::{ ConsolidationMode, LivelinessQueryState, QueryConsolidation, QueryState, QueryTarget, Reply, @@ -125,6 +125,7 @@ zconfigurable! { } pub(crate) struct SessionState { + pub(crate) subscription_version: u64, pub(crate) primitives: Option>, // @TODO replace with MaybeUninit ?? pub(crate) expr_id_counter: AtomicExprId, // @TODO: manage rollover and uniqueness pub(crate) qid_counter: AtomicRequestId, @@ -159,6 +160,7 @@ impl SessionState { publisher_qos_tree: KeBoxTree, ) -> SessionState { SessionState { + subscription_version: 0, primitives: None, expr_id_counter: AtomicExprId::new(1), // Note: start at 1 because 0 is reserved for NO_RESOURCE qid_counter: AtomicRequestId::new(0), @@ -1473,6 +1475,7 @@ impl SessionInner { callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); + state.subscription_version += 1; tracing::trace!("declare_subscriber({:?})", key_expr); let id = self.runtime.next_id(); let (sub_state, declared_sub) = state.register_subscriber(id, key_expr, origin, callback); @@ -2061,11 +2064,11 @@ impl SessionInner { kind: SubscriberKind, #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, - ) { + ) -> bool { let mut callbacks = SingleOrVec::default(); let state = zread!(self.state); if state.primitives.is_none() { - return; // Session closing or closed + return false; // Session closing or closed } if key_expr.suffix.is_empty() { match state.get_res(&key_expr.scope, key_expr.mapping, local) { @@ -2083,11 +2086,11 @@ impl SessionInner { "Received Data for `{}`, which isn't a key expression", prefix ); - return; + return false; } None => { tracing::error!("Received Data for unknown expr_id: {}", key_expr.scope); - return; + return false; } } } else { @@ -2104,11 +2107,14 @@ impl SessionInner { } Err(err) => { tracing::error!("Received Data for unknown key_expr: {}", err); - return; + return false; } } }; drop(state); + if callbacks.is_empty() { + return false; + } let mut sample = info.clone().into_sample( // SAFETY: the keyexpr is valid unsafe { KeyExpr::from_str_unchecked("dummy") }, @@ -2126,15 +2132,16 @@ impl SessionInner { sample.key_expr = key_expr; cb.call(sample); } + true } #[allow(clippy::too_many_arguments)] // TODO fixme - pub(crate) fn resolve_put( + #[inline(always)] + pub(crate) fn resolve_push( &self, + cache: &mut PublisherCacheValue, key_expr: &KeyExpr, - payload: ZBytes, - kind: SampleKind, - encoding: Encoding, + mut put: Option, congestion_control: CongestionControl, priority: Priority, is_express: bool, @@ -2145,11 +2152,21 @@ impl SessionInner { attachment: Option, ) -> ZResult<()> { trace!("write({:?}, [...])", key_expr); - let primitives = zread!(self.state).primitives()?; + let state = zread!(self.state); + let primitives = state + .primitives + .as_ref() + .cloned() + .ok_or(SessionClosedError)?; + cache.match_subscription_version(state.subscription_version); + drop(state); let timestamp = timestamp.or_else(|| self.runtime.new_timestamp()); let wire_expr = key_expr.to_wire(self); - if destination != Locality::SessionLocal { - primitives.send_push( + let push_remote = cache.has_remote_sub() && destination != Locality::SessionLocal; + let push_local = cache.has_local_sub() && destination != Locality::Remote; + if push_remote { + let put = if push_local { put.clone() } else { put.take() }; + let remote = primitives.route_data( Push { wire_expr: wire_expr.to_owned(), ext_qos: push::ext::QoSType::new( @@ -2159,10 +2176,10 @@ impl SessionInner { ), ext_tstamp: None, ext_nodeid: push::ext::NodeIdType::DEFAULT, - payload: match kind { - SampleKind::Put => PushBody::Put(Put { + payload: match put { + Some(put) => PushBody::Put(Put { timestamp, - encoding: encoding.clone().into(), + encoding: put.encoding.into(), #[cfg(feature = "unstable")] ext_sinfo: source_info.clone().into(), #[cfg(not(feature = "unstable"))] @@ -2171,9 +2188,9 @@ impl SessionInner { ext_shm: None, ext_attachment: attachment.clone().map(|a| a.into()), ext_unknown: vec![], - payload: payload.clone().into(), + payload: put.payload.into(), }), - SampleKind::Delete => PushBody::Del(Del { + None => PushBody::Del(Del { timestamp, #[cfg(feature = "unstable")] ext_sinfo: source_info.clone().into(), @@ -2189,8 +2206,15 @@ impl SessionInner { #[cfg(not(feature = "unstable"))] Reliability::DEFAULT, ); + if !remote { + cache.set_no_remote_sub(); + } } - if destination != Locality::Remote { + if push_local { + let (kind, payload, encoding) = match put { + Some(put) => (SampleKind::Put, put.payload, put.encoding), + None => (SampleKind::Delete, ZBytes::default(), Encoding::default()), + }; let data_info = DataInfo { kind, encoding: Some(encoding), @@ -2210,7 +2234,7 @@ impl SessionInner { )), }; - self.execute_subscriber_callbacks( + let local = self.execute_subscriber_callbacks( true, &wire_expr, Some(data_info), @@ -2220,6 +2244,9 @@ impl SessionInner { reliability, attachment, ); + if !local { + cache.set_no_local_sub(); + } } Ok(()) } @@ -2527,9 +2554,10 @@ impl Primitives for WeakSession { } zenoh_protocol::network::DeclareBody::DeclareSubscriber(m) => { trace!("recv DeclareSubscriber {} {:?}", m.id, m.wire_expr); + let mut state = zwrite!(self.state); + state.subscription_version += 1; #[cfg(feature = "unstable")] { - let mut state = zwrite!(self.state); if state.primitives.is_none() { return; // Session closing or closed } @@ -2772,7 +2800,7 @@ impl Primitives for WeakSession { #[cfg(feature = "unstable")] _reliability, m.ext_attachment.map(Into::into), - ) + ); } PushBody::Del(m) => { let info = DataInfo { @@ -2792,7 +2820,7 @@ impl Primitives for WeakSession { #[cfg(feature = "unstable")] _reliability, m.ext_attachment.map(Into::into), - ) + ); } } } diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 6e1db6bbf1..8ad41ba98b 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -213,6 +213,11 @@ impl Face { state: Arc::downgrade(&self.state), } } + + #[inline] + pub fn route_data(&self, msg: Push, reliability: Reliability) -> bool { + route_data(&self.tables, &self.state, msg, reliability) + } } impl Primitives for Face { diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index c755e26a4e..7573853413 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -305,7 +305,7 @@ macro_rules! treat_timestamp { "Error treating timestamp for received Data ({}). Drop it!", e ); - return; + return false; } else { data.timestamp = Some(hlc.new_timestamp()); tracing::error!( @@ -393,104 +393,104 @@ pub fn route_data( face: &FaceState, mut msg: Push, reliability: Reliability, -) { +) -> bool { let tables = zread!(tables_ref.tables); - match tables + let Some(prefix) = tables .get_mapping(face, &msg.wire_expr.scope, msg.wire_expr.mapping) .cloned() - { - Some(prefix) => { - tracing::trace!( - "{} Route data for res {}{}", - face, - prefix.expr(), - msg.wire_expr.suffix.as_ref() - ); - let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref()); + else { + tracing::error!( + "{} Route data with unknown scope {}!", + face, + msg.wire_expr.scope + ); + return false; + }; + tracing::trace!( + "{} Route data for res {}{}", + face, + prefix.expr(), + msg.wire_expr.suffix.as_ref() + ); + let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref()); + + #[cfg(feature = "stats")] + let admin = expr.full_expr().starts_with("@/"); + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, rx, user, msg.payload) + } else { + inc_stats!(face, rx, admin, msg.payload) + } + let mut routed = false; + if tables.hat_code.ingress_filter(&tables, face, &mut expr) { + let res = Resource::get_resource(&prefix, expr.suffix); + + let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id); + + if !route.is_empty() { + treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp); + + if route.len() == 1 { + let (outface, key_expr, context) = route.values().next().unwrap(); + if tables + .hat_code + .egress_filter(&tables, face, outface, &mut expr) + { + drop(tables); + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, msg.payload) + } else { + inc_stats!(face, tx, admin, msg.payload) + } - #[cfg(feature = "stats")] - let admin = expr.full_expr().starts_with("@/"); - #[cfg(feature = "stats")] - if !admin { - inc_stats!(face, rx, user, msg.payload) + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos: msg.ext_qos, + ext_tstamp: msg.ext_tstamp, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload: msg.payload, + }, + reliability, + ); + routed = true; + } } else { - inc_stats!(face, rx, admin, msg.payload) - } - - if tables.hat_code.ingress_filter(&tables, face, &mut expr) { - let res = Resource::get_resource(&prefix, expr.suffix); - - let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id); - - if !route.is_empty() { - treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp); - - if route.len() == 1 { - let (outface, key_expr, context) = route.values().next().unwrap(); - if tables + let route = route + .values() + .filter(|(outface, _key_expr, _context)| { + tables .hat_code .egress_filter(&tables, face, outface, &mut expr) - { - drop(tables); - #[cfg(feature = "stats")] - if !admin { - inc_stats!(face, tx, user, msg.payload) - } else { - inc_stats!(face, tx, admin, msg.payload) - } - - outface.primitives.send_push( - Push { - wire_expr: key_expr.into(), - ext_qos: msg.ext_qos, - ext_tstamp: msg.ext_tstamp, - ext_nodeid: ext::NodeIdType { node_id: *context }, - payload: msg.payload, - }, - reliability, - ) - } + }) + .cloned() + .collect::>(); + + drop(tables); + for (outface, key_expr, context) in route { + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, msg.payload) } else { - let route = route - .values() - .filter(|(outface, _key_expr, _context)| { - tables - .hat_code - .egress_filter(&tables, face, outface, &mut expr) - }) - .cloned() - .collect::>(); - - drop(tables); - for (outface, key_expr, context) in route { - #[cfg(feature = "stats")] - if !admin { - inc_stats!(face, tx, user, msg.payload) - } else { - inc_stats!(face, tx, admin, msg.payload) - } - - outface.primitives.send_push( - Push { - wire_expr: key_expr, - ext_qos: msg.ext_qos, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType { node_id: context }, - payload: msg.payload.clone(), - }, - reliability, - ) - } + inc_stats!(face, tx, admin, msg.payload) } + + outface.primitives.send_push( + Push { + wire_expr: key_expr, + ext_qos: msg.ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: context }, + payload: msg.payload.clone(), + }, + reliability, + ); + routed = true; } } } - None => { - tracing::error!( - "{} Route data with unknown scope {}!", - face, - msg.wire_expr.scope - ); - } } + routed }