From 9f02992cf15b3f23c86a5e384b50effa81955d81 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 20 Nov 2024 12:10:33 +0300 Subject: [PATCH] POC for session-local optimization --- Cargo.lock | 1 + examples/Cargo.toml | 4 + examples/examples/z_local_pub_sub_thr.rs | 158 +++++++++++++++++++++ zenoh/Cargo.toml | 1 + zenoh/src/api/sample.rs | 4 +- zenoh/src/api/session.rs | 43 +++--- zenoh/src/net/primitives/mod.rs | 6 +- zenoh/src/net/routing/dispatcher/face.rs | 9 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 138 ++++++++++++++++++ 9 files changed, 340 insertions(+), 24 deletions(-) create mode 100644 examples/examples/z_local_pub_sub_thr.rs diff --git a/Cargo.lock b/Cargo.lock index 317af86fff..5e1809a02e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5453,6 +5453,7 @@ dependencies = [ "ahash", "async-trait", "bytes", + "clap", "flume", "futures", "git-version", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 687f4790c1..6a909f9c32 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -174,3 +174,7 @@ required-features = ["unstable", "shared-memory"] name = "z_posix_shm_provider" path = "examples/z_posix_shm_provider.rs" required-features = ["unstable", "shared-memory"] + +[[example]] +name = "z_local_pub_sub_thr" +path = "examples/z_local_pub_sub_thr.rs" diff --git a/examples/examples/z_local_pub_sub_thr.rs b/examples/examples/z_local_pub_sub_thr.rs new file mode 100644 index 0000000000..1d2c6a4ebe --- /dev/null +++ b/examples/examples/z_local_pub_sub_thr.rs @@ -0,0 +1,158 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{convert::TryInto, time::Instant}; + +use clap::Parser; +use zenoh::{ + bytes::ZBytes, qos::{CongestionControl, Priority}, sample::Locality, Wait +}; +use zenoh_examples::CommonArgs; + + +struct Stats { + round_count: usize, + round_size: usize, + finished_rounds: usize, + round_start: Instant, + global_start: Option, +} +impl Stats { + fn new(round_size: usize) -> Self { + Stats { + round_count: 0, + round_size, + finished_rounds: 0, + round_start: Instant::now(), + global_start: None, + } + } + fn increment(&mut self) { + if self.round_count == 0 { + self.round_start = Instant::now(); + if self.global_start.is_none() { + self.global_start = Some(self.round_start) + } + self.round_count += 1; + } else if self.round_count < self.round_size { + self.round_count += 1; + } else { + self.print_round(); + self.finished_rounds += 1; + self.round_count = 0; + } + } + fn print_round(&self) { + let elapsed = self.round_start.elapsed().as_secs_f64(); + let throughput = (self.round_size as f64) / elapsed; + println!("{throughput} msg/s"); + } +} +impl Drop for Stats { + fn drop(&mut self) { + let Some(global_start) = self.global_start else { + return; + }; + let elapsed = global_start.elapsed().as_secs_f64(); + let total = self.round_size * self.finished_rounds + self.round_count; + let throughput = total as f64 / elapsed; + println!("Received {total} messages over {elapsed:.2}s: {throughput}msg/s"); + } +} + +fn main() { + // initiate logging + zenoh::init_log_from_env_or("error"); + let args = Args::parse(); + + let session = zenoh::open(args.common).wait().unwrap(); + + let key_expr = "test/thr"; + + let mut stats = Stats::new(args.number); + session + .declare_subscriber(key_expr) + .callback_mut(move |_sample| { + stats.increment(); + if stats.finished_rounds >= args.samples { + std::process::exit(0) + } + }) + .background() + .wait() + .unwrap(); + + let mut prio = Priority::DEFAULT; + if let Some(p) = args.priority { + prio = p.try_into().unwrap(); + } + + let publisher = session + .declare_publisher(key_expr) + .congestion_control(CongestionControl::Block) + .priority(prio) + .express(args.express) + .allowed_destination(args.allowed_destination) + .wait() + .unwrap(); + + println!("Press CTRL-C to quit..."); + let payload_size = args.payload_size; + let data: ZBytes = (0..payload_size) + .map(|i| (i % 10) as u8) + .collect::>() + .into(); + let mut count: usize = 0; + let mut start = std::time::Instant::now(); + loop { + publisher.put(data.clone()).wait().unwrap(); + + if args.print { + if count < args.number { + count += 1; + } else { + let thpt = count as f64 / start.elapsed().as_secs_f64(); + println!("{thpt} msg/s"); + count = 0; + start = std::time::Instant::now(); + } + } + } +} + +#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "10")] + /// Number of throughput measurements. + samples: usize, + /// express for sending data + #[arg(long, default_value = "false")] + express: bool, + /// Priority for sending data + #[arg(short, long)] + priority: Option, + /// Print the statistics + #[arg(short = 't', long)] + print: bool, + /// Number of messages in each throughput measurements + #[arg(short, long, default_value = "10000000")] + number: usize, + #[arg(short, long, default_value = "any")] + allowed_destination: Locality, + /// Sets the size of the payload to publish + #[arg(long, default_value = "8")] + payload_size: usize, + #[command(flatten)] + common: CommonArgs, +} diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 94f6d6eb48..8ec7d4743c 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -73,6 +73,7 @@ tokio-util = { workspace = true } ahash = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +clap = { workspace = true, features = ["derive"] } flume = { workspace = true } futures = { workspace = true } git-version = { workspace = true } diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 75d6c32b63..d0c7272f07 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -34,7 +34,7 @@ pub type SourceSn = u32; /// The locality of samples to be received by subscribers or targeted by publishers. #[zenoh_macros::unstable] -#[derive(Clone, Copy, Debug, Default, Serialize, PartialEq, Eq)] +#[derive(Clone, Copy, clap::ValueEnum, Hash, Debug, Default, Serialize, PartialEq, Eq)] pub enum Locality { SessionLocal, Remote, @@ -42,7 +42,7 @@ pub enum Locality { Any, } #[cfg(not(feature = "unstable"))] -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, clap::ValueEnum, Hash, Default, PartialEq, Eq)] pub(crate) enum Locality { SessionLocal, Remote, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 0c01bffdbd..6a193829e4 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -104,7 +104,7 @@ use crate::{ Id, }, net::{ - primitives::Primitives, + primitives::{OptPrimitives, Primitives}, routing::dispatcher::face::Face, runtime::{Runtime, RuntimeBuilder}, }, @@ -1928,22 +1928,24 @@ impl SessionInner { } }; drop(state); - let mut sample = info.clone().into_sample( - // SAFETY: the keyexpr is valid - unsafe { KeyExpr::from_str_unchecked("dummy") }, - payload, - #[cfg(feature = "unstable")] - reliability, - attachment, - ); - let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); - for (cb, key_expr) in drain { - sample.key_expr = key_expr; - cb.call(sample.clone()); - } - if let Some((cb, key_expr)) = last { - sample.key_expr = key_expr; - cb.call(sample); + if !callbacks.is_empty() { + let mut sample = info.clone().into_sample( + // SAFETY: the keyexpr is valid + unsafe { KeyExpr::from_str_unchecked("dummy") }, + payload, + #[cfg(feature = "unstable")] + reliability, + attachment, + ); + let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); + for (cb, key_expr) in drain { + sample.key_expr = key_expr; + cb.call(sample.clone()); + } + if let Some((cb, key_expr)) = last { + sample.key_expr = key_expr; + cb.call(sample); + } } } @@ -1964,12 +1966,13 @@ impl SessionInner { attachment: Option, ) -> ZResult<()> { trace!("write({:?}, [...])", key_expr); - let primitives = zread!(self.state).primitives()?; let timestamp = timestamp.or_else(|| self.runtime.new_timestamp()); let wire_expr = key_expr.to_wire(self); if destination != Locality::SessionLocal { - primitives.send_push( - Push { + let primitives = zread!(self.state).primitives()?; + primitives.opt_send_push( + &wire_expr, + || Push { wire_expr: wire_expr.to_owned(), ext_qos: push::ext::QoSType::new( priority.into(), diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index 21526466f5..d5fb50ebda 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -19,12 +19,16 @@ use std::any::Any; pub use demux::*; pub use mux::*; use zenoh_protocol::{ - core::Reliability, + core::{Reliability, WireExpr}, network::{interest::Interest, Declare, Push, Request, Response, ResponseFinal}, }; use super::routing::RoutingContext; +pub trait OptPrimitives: Send + Sync { + fn opt_send_pushPush>(&self,wire_expr: &WireExpr<'_>, fn_msg: F, reliability: Reliability); +} + pub trait Primitives: Send + Sync { fn send_interest(&self, msg: Interest); diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 6e1db6bbf1..044f4de2d8 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -43,7 +43,7 @@ use super::{ use crate::{ api::key_expr::KeyExpr, net::{ - primitives::{McastMux, Mux, Primitives}, + primitives::{McastMux, Mux, OptPrimitives, Primitives}, routing::{ dispatcher::interests::finalize_pending_interests, interceptor::{InterceptorTrait, InterceptorsChain}, @@ -215,6 +215,13 @@ impl Face { } } +impl OptPrimitives for Face { + #[inline] + fn opt_send_pushPush>(&self,wire_expr: &zenoh_protocol::core::WireExpr<'_>, fn_msg: F, reliability: Reliability) { + opt_route_data(&self.tables, &self.state, wire_expr, fn_msg, reliability); + } +} + impl Primitives for Face { fn send_interest(&self, msg: zenoh_protocol::network::Interest) { let ctrl_lock = zlock!(self.tables.ctrl_lock); diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 2fef86ba66..cea956d692 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -522,3 +522,141 @@ pub fn route_data( } } } + + +pub fn opt_route_dataPush>( + tables_ref: &Arc, + face: &FaceState, + wire_expr: &WireExpr<'_>, + fn_msg: F, + reliability: Reliability, +) { + let tables = zread!(tables_ref.tables); + match tables + .get_mapping(face, &wire_expr.scope, wire_expr.mapping) + .cloned() + { + Some(prefix) => { + let mut msg = fn_msg(); + tracing::trace!( + "{} Route data for res {}{}", + face, + prefix.expr(), + msg.wire_expr.suffix.as_ref() + ); + let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref()); + + #[cfg(feature = "stats")] + let admin = expr.full_expr().starts_with("@/"); + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, rx, user, msg.payload) + } else { + inc_stats!(face, rx, admin, msg.payload) + } + + if tables.hat_code.ingress_filter(&tables, face, &mut expr) { + let res = Resource::get_resource(&prefix, expr.suffix); + + let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id); + + if !route.is_empty() { + treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp); + + if route.len() == 1 { + let (outface, key_expr, context) = route.values().next().unwrap(); + if tables + .hat_code + .egress_filter(&tables, face, outface, &mut expr) + { + drop(tables); + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, msg.payload) + } else { + inc_stats!(face, tx, admin, msg.payload) + } + + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos: msg.ext_qos, + ext_tstamp: msg.ext_tstamp, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload: msg.payload, + }, + reliability, + ) + } + } else if tables.whatami == WhatAmI::Router { + let route = route + .values() + .filter(|(outface, _key_expr, _context)| { + tables + .hat_code + .egress_filter(&tables, face, outface, &mut expr) + }) + .cloned() + .collect::>(); + + drop(tables); + for (outface, key_expr, context) in route { + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, msg.payload) + } else { + inc_stats!(face, tx, admin, msg.payload) + } + + outface.primitives.send_push( + Push { + wire_expr: key_expr, + ext_qos: msg.ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: context }, + payload: msg.payload.clone(), + }, + reliability, + ) + } + } else { + drop(tables); + for (outface, key_expr, context) in route.values() { + if face.id != outface.id + && match (face.mcast_group.as_ref(), outface.mcast_group.as_ref()) { + (Some(l), Some(r)) => l != r, + _ => true, + } + { + #[cfg(feature = "stats")] + if !admin { + inc_stats!(face, tx, user, msg.payload) + } else { + inc_stats!(face, tx, admin, msg.payload) + } + + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos: msg.ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload: msg.payload.clone(), + }, + reliability, + ) + } + } + } + } + } + } + None => { + tracing::error!( + "{} Route data with unknown scope {}!", + face, + wire_expr.scope + ); + } + } +} \ No newline at end of file