diff --git a/Cargo.lock b/Cargo.lock index 012994156..ed16d4ad8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5060,6 +5060,7 @@ dependencies = [ "ahash", "async-trait", "bytes", + "clap", "flume", "futures", "git-version", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 687f4790c..6a909f9c3 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -174,3 +174,7 @@ required-features = ["unstable", "shared-memory"] name = "z_posix_shm_provider" path = "examples/z_posix_shm_provider.rs" required-features = ["unstable", "shared-memory"] + +[[example]] +name = "z_local_pub_sub_thr" +path = "examples/z_local_pub_sub_thr.rs" diff --git a/examples/examples/z_local_pub_sub_thr.rs b/examples/examples/z_local_pub_sub_thr.rs new file mode 100644 index 000000000..1d2c6a4eb --- /dev/null +++ b/examples/examples/z_local_pub_sub_thr.rs @@ -0,0 +1,158 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{convert::TryInto, time::Instant}; + +use clap::Parser; +use zenoh::{ + bytes::ZBytes, qos::{CongestionControl, Priority}, sample::Locality, Wait +}; +use zenoh_examples::CommonArgs; + + +struct Stats { + round_count: usize, + round_size: usize, + finished_rounds: usize, + round_start: Instant, + global_start: Option, +} +impl Stats { + fn new(round_size: usize) -> Self { + Stats { + round_count: 0, + round_size, + finished_rounds: 0, + round_start: Instant::now(), + global_start: None, + } + } + fn increment(&mut self) { + if self.round_count == 0 { + self.round_start = Instant::now(); + if self.global_start.is_none() { + self.global_start = Some(self.round_start) + } + self.round_count += 1; + } else if self.round_count < self.round_size { + self.round_count += 1; + } else { + self.print_round(); + self.finished_rounds += 1; + self.round_count = 0; + } + } + fn print_round(&self) { + let elapsed = self.round_start.elapsed().as_secs_f64(); + let throughput = (self.round_size as f64) / elapsed; + println!("{throughput} msg/s"); + } +} +impl Drop for Stats { + fn drop(&mut self) { + let Some(global_start) = self.global_start else { + return; + }; + let elapsed = global_start.elapsed().as_secs_f64(); + let total = self.round_size * self.finished_rounds + self.round_count; + let throughput = total as f64 / elapsed; + println!("Received {total} messages over {elapsed:.2}s: {throughput}msg/s"); + } +} + +fn main() { + // initiate logging + zenoh::init_log_from_env_or("error"); + let args = Args::parse(); + + let session = zenoh::open(args.common).wait().unwrap(); + + let key_expr = "test/thr"; + + let mut stats = Stats::new(args.number); + session + .declare_subscriber(key_expr) + .callback_mut(move |_sample| { + stats.increment(); + if stats.finished_rounds >= args.samples { + std::process::exit(0) + } + }) + .background() + .wait() + .unwrap(); + + let mut prio = Priority::DEFAULT; + if let Some(p) = args.priority { + prio = p.try_into().unwrap(); + } + + let publisher = session + .declare_publisher(key_expr) + .congestion_control(CongestionControl::Block) + .priority(prio) + .express(args.express) + .allowed_destination(args.allowed_destination) + .wait() + .unwrap(); + + println!("Press CTRL-C to quit..."); + let payload_size = args.payload_size; + let data: ZBytes = (0..payload_size) + .map(|i| (i % 10) as u8) + .collect::>() + .into(); + let mut count: usize = 0; + let mut start = std::time::Instant::now(); + loop { + publisher.put(data.clone()).wait().unwrap(); + + if args.print { + if count < args.number { + count += 1; + } else { + let thpt = count as f64 / start.elapsed().as_secs_f64(); + println!("{thpt} msg/s"); + count = 0; + start = std::time::Instant::now(); + } + } + } +} + +#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "10")] + /// Number of throughput measurements. + samples: usize, + /// express for sending data + #[arg(long, default_value = "false")] + express: bool, + /// Priority for sending data + #[arg(short, long)] + priority: Option, + /// Print the statistics + #[arg(short = 't', long)] + print: bool, + /// Number of messages in each throughput measurements + #[arg(short, long, default_value = "10000000")] + number: usize, + #[arg(short, long, default_value = "any")] + allowed_destination: Locality, + /// Sets the size of the payload to publish + #[arg(long, default_value = "8")] + payload_size: usize, + #[command(flatten)] + common: CommonArgs, +} diff --git a/examples/examples/z_pub_thr.rs b/examples/examples/z_pub_thr.rs index dc18715e2..bb68e2254 100644 --- a/examples/examples/z_pub_thr.rs +++ b/examples/examples/z_pub_thr.rs @@ -46,6 +46,12 @@ fn main() { .congestion_control(CongestionControl::Block) .priority(prio) .express(args.express) + .allowed_destination({ + match args.remote { + true => zenoh::sample::Locality::Remote, + false => zenoh::sample::Locality::Any, + } + }) .wait() .unwrap(); @@ -70,6 +76,9 @@ fn main() { #[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] struct Args { + /// remote-only locality for sending data + #[arg(short, long, default_value = "false")] + remote: bool, /// express for sending data #[arg(long, default_value = "false")] express: bool, diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 94f6d6eb4..8ec7d4743 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -73,6 +73,7 @@ tokio-util = { workspace = true } ahash = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +clap = { workspace = true, features = ["derive"] } flume = { workspace = true } futures = { workspace = true } git-version = { workspace = true } diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 75d6c32b6..d0c7272f0 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -34,7 +34,7 @@ pub type SourceSn = u32; /// The locality of samples to be received by subscribers or targeted by publishers. #[zenoh_macros::unstable] -#[derive(Clone, Copy, Debug, Default, Serialize, PartialEq, Eq)] +#[derive(Clone, Copy, clap::ValueEnum, Hash, Debug, Default, Serialize, PartialEq, Eq)] pub enum Locality { SessionLocal, Remote, @@ -42,7 +42,7 @@ pub enum Locality { Any, } #[cfg(not(feature = "unstable"))] -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, clap::ValueEnum, Hash, Default, PartialEq, Eq)] pub(crate) enum Locality { SessionLocal, Remote, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 0c01bffdb..529e2dd83 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -104,7 +104,7 @@ use crate::{ Id, }, net::{ - primitives::Primitives, + primitives::{OptPrimitives, Primitives}, routing::dispatcher::face::Face, runtime::{Runtime, RuntimeBuilder}, }, @@ -1928,22 +1928,24 @@ impl SessionInner { } }; drop(state); - let mut sample = info.clone().into_sample( - // SAFETY: the keyexpr is valid - unsafe { KeyExpr::from_str_unchecked("dummy") }, - payload, - #[cfg(feature = "unstable")] - reliability, - attachment, - ); - let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); - for (cb, key_expr) in drain { - sample.key_expr = key_expr; - cb.call(sample.clone()); - } - if let Some((cb, key_expr)) = last { - sample.key_expr = key_expr; - cb.call(sample); + if !callbacks.is_empty() { + let mut sample = info.clone().into_sample( + // SAFETY: the keyexpr is valid + unsafe { KeyExpr::from_str_unchecked("dummy") }, + payload, + #[cfg(feature = "unstable")] + reliability, + attachment, + ); + let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); + for (cb, key_expr) in drain { + sample.key_expr = key_expr; + cb.call(sample.clone()); + } + if let Some((cb, key_expr)) = last { + sample.key_expr = key_expr; + cb.call(sample); + } } } @@ -1964,12 +1966,44 @@ impl SessionInner { attachment: Option, ) -> ZResult<()> { trace!("write({:?}, [...])", key_expr); - let primitives = zread!(self.state).primitives()?; let timestamp = timestamp.or_else(|| self.runtime.new_timestamp()); let wire_expr = key_expr.to_wire(self); if destination != Locality::SessionLocal { - primitives.send_push( - Push { + let primitives = zread!(self.state).primitives()?; + primitives.opt_send_push( + &wire_expr, + || { ( + push::ext::QoSType::new( + priority.into(), + congestion_control, + is_express, + ), + match kind { + SampleKind::Put => PushBody::Put(Put { + timestamp, + encoding: encoding.clone().into(), + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + payload: payload.clone().into(), + }), + SampleKind::Delete => PushBody::Del(Del { + timestamp, + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + }), + }, + ) + /*Push { wire_expr: wire_expr.to_owned(), ext_qos: push::ext::QoSType::new( priority.into(), @@ -2002,7 +2036,8 @@ impl SessionInner { ext_unknown: vec![], }), }, - }, + }*/ + }, #[cfg(feature = "unstable")] reliability, #[cfg(not(feature = "unstable"))] diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index 21526466f..5772b8efe 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -19,12 +19,16 @@ use std::any::Any; pub use demux::*; pub use mux::*; use zenoh_protocol::{ - core::Reliability, - network::{interest::Interest, Declare, Push, Request, Response, ResponseFinal}, + core::{Reliability, WireExpr}, + network::{interest::Interest, push, Declare, Push, Request, Response, ResponseFinal}, zenoh::PushBody, }; use super::routing::RoutingContext; +pub trait OptPrimitives: Send + Sync { + fn opt_send_push(push::ext::QoSType, PushBody)>(&self,wire_expr: &WireExpr<'_>, fn_msg: F, reliability: Reliability); +} + pub trait Primitives: Send + Sync { fn send_interest(&self, msg: Interest); diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 6e1db6bbf..eb1203f3c 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -23,10 +23,9 @@ use tokio_util::sync::CancellationToken; use zenoh_protocol::{ core::{ExprId, Reliability, WhatAmI, ZenohIdProto}, network::{ - interest::{InterestId, InterestMode, InterestOptions}, - Mapping, Push, Request, RequestId, Response, ResponseFinal, + interest::{InterestId, InterestMode, InterestOptions}, push, Mapping, Push, Request, RequestId, Response, ResponseFinal }, - zenoh::RequestBody, + zenoh::{PushBody, RequestBody}, }; use zenoh_sync::get_mut_unchecked; use zenoh_task::TaskController; @@ -43,7 +42,7 @@ use super::{ use crate::{ api::key_expr::KeyExpr, net::{ - primitives::{McastMux, Mux, Primitives}, + primitives::{McastMux, Mux, OptPrimitives, Primitives}, routing::{ dispatcher::interests::finalize_pending_interests, interceptor::{InterceptorTrait, InterceptorsChain}, @@ -215,6 +214,13 @@ impl Face { } } +impl OptPrimitives for Face { + #[inline] + fn opt_send_push(push::ext::QoSType, PushBody)>(&self,wire_expr: &zenoh_protocol::core::WireExpr<'_>, fn_msg: F, reliability: Reliability) { + opt_route_data(&self.tables, &self.state, wire_expr, fn_msg, reliability); + } +} + impl Primitives for Face { fn send_interest(&self, msg: zenoh_protocol::network::Interest) { let ctrl_lock = zlock!(self.tables.ctrl_lock); diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 2fef86ba6..52fd4e407 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -17,8 +17,7 @@ use zenoh_core::zread; use zenoh_protocol::{ core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr}, network::{ - declare::{ext, SubscriberId}, - Push, + declare::{ext, SubscriberId}, push, Push }, zenoh::PushBody, }; @@ -522,3 +521,141 @@ pub fn route_data( } } } + + +pub fn opt_route_data(push::ext::QoSType, PushBody)>( + tables_ref: &Arc, + face: &FaceState, + wire_expr: &WireExpr<'_>, + fn_msg: F, + reliability: Reliability, +) { + let tables = zread!(tables_ref.tables); + match tables + .get_mapping(face, &wire_expr.scope, wire_expr.mapping) + .cloned() + { + Some(prefix) => { + tracing::trace!( + "{} Route data for res {}{}", + face, + prefix.expr(), + wire_expr.suffix.as_ref() + ); + let mut expr = RoutingExpr::new(&prefix, wire_expr.suffix.as_ref()); + + #[cfg(feature = "stats")] + let admin = expr.full_expr().starts_with("@/"); + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, rx, user, msg.payload) + } else { + inc_stats!(face, rx, admin, msg.payload) + } + + if tables.hat_code.ingress_filter(&tables, face, &mut expr) { + let res = Resource::get_resource(&prefix, expr.suffix); + + let route = get_data_route(&tables, face, &res, &mut expr, push::ext::NodeIdType::DEFAULT.node_id); + + if !route.is_empty() { + let (ext_qos, mut payload) = fn_msg(); + treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp); + + if route.len() == 1 { + let (outface, key_expr, context) = route.values().next().unwrap(); + if tables + .hat_code + .egress_filter(&tables, face, outface, &mut expr) + { + drop(tables); + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, msg.payload) + } else { + inc_stats!(face, tx, admin, msg.payload) + } + + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload, + }, + reliability, + ) + } + } else if tables.whatami == WhatAmI::Router { + let route = route + .values() + .filter(|(outface, _key_expr, _context)| { + tables + .hat_code + .egress_filter(&tables, face, outface, &mut expr) + }) + .cloned() + .collect::>(); + + drop(tables); + for (outface, key_expr, context) in route { + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, msg.payload) + } else { + inc_stats!(face, tx, admin, msg.payload) + } + + outface.primitives.send_push( + Push { + wire_expr: key_expr, + ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: context }, + payload: payload.clone(), + }, + reliability, + ) + } + } else { + drop(tables); + for (outface, key_expr, context) in route.values() { + if face.id != outface.id + && match (face.mcast_group.as_ref(), outface.mcast_group.as_ref()) { + (Some(l), Some(r)) => l != r, + _ => true, + } + { + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, msg.payload) + } else { + inc_stats!(face, tx, admin, msg.payload) + } + + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload: payload.clone(), + }, + reliability, + ) + } + } + } + } + } + } + None => { + tracing::error!( + "{} Route data with unknown scope {}!", + face, + wire_expr.scope + ); + } + } +} \ No newline at end of file