Skip to content

Commit

Permalink
POC for session-local optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Nov 20, 2024
1 parent 3404e05 commit 45039c9
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,7 @@ required-features = ["unstable", "shared-memory"]
name = "z_posix_shm_provider"
path = "examples/z_posix_shm_provider.rs"
required-features = ["unstable", "shared-memory"]

[[example]]
name = "z_local_pub_sub_thr"
path = "examples/z_local_pub_sub_thr.rs"
158 changes: 158 additions & 0 deletions examples/examples/z_local_pub_sub_thr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{convert::TryInto, time::Instant};

use clap::Parser;
use zenoh::{
bytes::ZBytes, qos::{CongestionControl, Priority}, sample::Locality, Wait
};
use zenoh_examples::CommonArgs;


struct Stats {
round_count: usize,
round_size: usize,
finished_rounds: usize,
round_start: Instant,
global_start: Option<Instant>,
}
impl Stats {
fn new(round_size: usize) -> Self {
Stats {
round_count: 0,
round_size,
finished_rounds: 0,
round_start: Instant::now(),
global_start: None,
}
}
fn increment(&mut self) {
if self.round_count == 0 {
self.round_start = Instant::now();
if self.global_start.is_none() {
self.global_start = Some(self.round_start)
}
self.round_count += 1;
} else if self.round_count < self.round_size {
self.round_count += 1;
} else {
self.print_round();
self.finished_rounds += 1;
self.round_count = 0;
}
}
fn print_round(&self) {
let elapsed = self.round_start.elapsed().as_secs_f64();
let throughput = (self.round_size as f64) / elapsed;
println!("{throughput} msg/s");
}
}
impl Drop for Stats {
fn drop(&mut self) {
let Some(global_start) = self.global_start else {
return;
};
let elapsed = global_start.elapsed().as_secs_f64();
let total = self.round_size * self.finished_rounds + self.round_count;
let throughput = total as f64 / elapsed;
println!("Received {total} messages over {elapsed:.2}s: {throughput}msg/s");
}
}

fn main() {
// initiate logging
zenoh::init_log_from_env_or("error");
let args = Args::parse();

let session = zenoh::open(args.common).wait().unwrap();

let key_expr = "test/thr";

let mut stats = Stats::new(args.number);
session
.declare_subscriber(key_expr)
.callback_mut(move |_sample| {
stats.increment();
if stats.finished_rounds >= args.samples {
std::process::exit(0)
}
})
.background()
.wait()
.unwrap();

let mut prio = Priority::DEFAULT;
if let Some(p) = args.priority {
prio = p.try_into().unwrap();
}

let publisher = session
.declare_publisher(key_expr)
.congestion_control(CongestionControl::Block)
.priority(prio)
.express(args.express)
.allowed_destination(args.allowed_destination)
.wait()
.unwrap();

println!("Press CTRL-C to quit...");
let payload_size = args.payload_size;
let data: ZBytes = (0..payload_size)
.map(|i| (i % 10) as u8)
.collect::<Vec<u8>>()
.into();
let mut count: usize = 0;
let mut start = std::time::Instant::now();
loop {
publisher.put(data.clone()).wait().unwrap();

if args.print {
if count < args.number {
count += 1;
} else {
let thpt = count as f64 / start.elapsed().as_secs_f64();
println!("{thpt} msg/s");
count = 0;
start = std::time::Instant::now();
}
}
}
}

#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "10")]
/// Number of throughput measurements.
samples: usize,
/// express for sending data
#[arg(long, default_value = "false")]
express: bool,
/// Priority for sending data
#[arg(short, long)]
priority: Option<u8>,
/// Print the statistics
#[arg(short = 't', long)]
print: bool,
/// Number of messages in each throughput measurements
#[arg(short, long, default_value = "10000000")]
number: usize,
#[arg(short, long, default_value = "any")]
allowed_destination: Locality,
/// Sets the size of the payload to publish
#[arg(long, default_value = "8")]
payload_size: usize,
#[command(flatten)]
common: CommonArgs,
}
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ tokio-util = { workspace = true }
ahash = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = ["derive"] }
flume = { workspace = true }
futures = { workspace = true }
git-version = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/api/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ pub type SourceSn = u32;

/// The locality of samples to be received by subscribers or targeted by publishers.
#[zenoh_macros::unstable]
#[derive(Clone, Copy, Debug, Default, Serialize, PartialEq, Eq)]
#[derive(Clone, Copy, clap::ValueEnum, Hash, Debug, Default, Serialize, PartialEq, Eq)]
pub enum Locality {
SessionLocal,
Remote,
#[default]
Any,
}
#[cfg(not(feature = "unstable"))]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, clap::ValueEnum, Hash, Default, PartialEq, Eq)]
pub(crate) enum Locality {
SessionLocal,
Remote,
Expand Down
43 changes: 23 additions & 20 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ use crate::{
Id,
},
net::{
primitives::Primitives,
primitives::{OptPrimitives, Primitives},
routing::dispatcher::face::Face,
runtime::{Runtime, RuntimeBuilder},
},
Expand Down Expand Up @@ -1928,22 +1928,24 @@ impl SessionInner {
}
};
drop(state);
let mut sample = info.clone().into_sample(
// SAFETY: the keyexpr is valid
unsafe { KeyExpr::from_str_unchecked("dummy") },
payload,
#[cfg(feature = "unstable")]
reliability,
attachment,
);
let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter();
for (cb, key_expr) in drain {
sample.key_expr = key_expr;
cb.call(sample.clone());
}
if let Some((cb, key_expr)) = last {
sample.key_expr = key_expr;
cb.call(sample);
if !callbacks.is_empty() {
let mut sample = info.clone().into_sample(
// SAFETY: the keyexpr is valid
unsafe { KeyExpr::from_str_unchecked("dummy") },
payload,
#[cfg(feature = "unstable")]
reliability,
attachment,
);
let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter();
for (cb, key_expr) in drain {
sample.key_expr = key_expr;
cb.call(sample.clone());
}
if let Some((cb, key_expr)) = last {
sample.key_expr = key_expr;
cb.call(sample);
}
}
}

Expand All @@ -1964,12 +1966,13 @@ impl SessionInner {
attachment: Option<ZBytes>,
) -> ZResult<()> {
trace!("write({:?}, [...])", key_expr);
let primitives = zread!(self.state).primitives()?;
let timestamp = timestamp.or_else(|| self.runtime.new_timestamp());
let wire_expr = key_expr.to_wire(self);
if destination != Locality::SessionLocal {
primitives.send_push(
Push {
let primitives = zread!(self.state).primitives()?;
primitives.opt_send_push(
&wire_expr,
|| Push {
wire_expr: wire_expr.to_owned(),
ext_qos: push::ext::QoSType::new(
priority.into(),
Expand Down
6 changes: 5 additions & 1 deletion zenoh/src/net/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ use std::any::Any;
pub use demux::*;
pub use mux::*;
use zenoh_protocol::{
core::Reliability,
core::{Reliability, WireExpr},
network::{interest::Interest, Declare, Push, Request, Response, ResponseFinal},
};

use super::routing::RoutingContext;

pub trait OptPrimitives: Send + Sync {
fn opt_send_push<F: FnOnce()->Push>(&self,wire_expr: &WireExpr<'_>, fn_msg: F, reliability: Reliability);
}

pub trait Primitives: Send + Sync {
fn send_interest(&self, msg: Interest);

Expand Down
9 changes: 8 additions & 1 deletion zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::{
use crate::{
api::key_expr::KeyExpr,
net::{
primitives::{McastMux, Mux, Primitives},
primitives::{McastMux, Mux, OptPrimitives, Primitives},
routing::{
dispatcher::interests::finalize_pending_interests,
interceptor::{InterceptorTrait, InterceptorsChain},
Expand Down Expand Up @@ -215,6 +215,13 @@ impl Face {
}
}

impl OptPrimitives for Face {
#[inline]
fn opt_send_push<F: FnOnce()->Push>(&self,wire_expr: &zenoh_protocol::core::WireExpr<'_>, fn_msg: F, reliability: Reliability) {
opt_route_data(&self.tables, &self.state, wire_expr, fn_msg, reliability);
}
}

impl Primitives for Face {
fn send_interest(&self, msg: zenoh_protocol::network::Interest) {
let ctrl_lock = zlock!(self.tables.ctrl_lock);
Expand Down
Loading

0 comments on commit 45039c9

Please sign in to comment.