Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for session-local optimization #1599

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,7 @@ required-features = ["unstable", "shared-memory"]
name = "z_posix_shm_provider"
path = "examples/z_posix_shm_provider.rs"
required-features = ["unstable", "shared-memory"]

[[example]]
name = "z_local_pub_sub_thr"
path = "examples/z_local_pub_sub_thr.rs"
158 changes: 158 additions & 0 deletions examples/examples/z_local_pub_sub_thr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{convert::TryInto, time::Instant};

use clap::Parser;
use zenoh::{
bytes::ZBytes, qos::{CongestionControl, Priority}, sample::Locality, Wait
};
use zenoh_examples::CommonArgs;


struct Stats {
round_count: usize,
round_size: usize,
finished_rounds: usize,
round_start: Instant,
global_start: Option<Instant>,
}
impl Stats {
fn new(round_size: usize) -> Self {
Stats {
round_count: 0,
round_size,
finished_rounds: 0,
round_start: Instant::now(),
global_start: None,
}
}
fn increment(&mut self) {
if self.round_count == 0 {
self.round_start = Instant::now();
if self.global_start.is_none() {
self.global_start = Some(self.round_start)
}
self.round_count += 1;
} else if self.round_count < self.round_size {
self.round_count += 1;
} else {
self.print_round();
self.finished_rounds += 1;
self.round_count = 0;
}
}
fn print_round(&self) {
let elapsed = self.round_start.elapsed().as_secs_f64();
let throughput = (self.round_size as f64) / elapsed;
println!("{throughput} msg/s");
}
}
impl Drop for Stats {
fn drop(&mut self) {
let Some(global_start) = self.global_start else {
return;
};
let elapsed = global_start.elapsed().as_secs_f64();
let total = self.round_size * self.finished_rounds + self.round_count;
let throughput = total as f64 / elapsed;
println!("Received {total} messages over {elapsed:.2}s: {throughput}msg/s");
}
}

fn main() {
// initiate logging
zenoh::init_log_from_env_or("error");
let args = Args::parse();

let session = zenoh::open(args.common).wait().unwrap();

let key_expr = "test/thr";

let mut stats = Stats::new(args.number);
session
.declare_subscriber(key_expr)
.callback_mut(move |_sample| {
stats.increment();
if stats.finished_rounds >= args.samples {
std::process::exit(0)
}
})
.background()
.wait()
.unwrap();

let mut prio = Priority::DEFAULT;
if let Some(p) = args.priority {
prio = p.try_into().unwrap();
}

let publisher = session
.declare_publisher(key_expr)
.congestion_control(CongestionControl::Block)
.priority(prio)
.express(args.express)
.allowed_destination(args.allowed_destination)
.wait()
.unwrap();

println!("Press CTRL-C to quit...");
let payload_size = args.payload_size;
let data: ZBytes = (0..payload_size)
.map(|i| (i % 10) as u8)
.collect::<Vec<u8>>()
.into();
let mut count: usize = 0;
let mut start = std::time::Instant::now();
loop {
publisher.put(data.clone()).wait().unwrap();

if args.print {
if count < args.number {
count += 1;
} else {
let thpt = count as f64 / start.elapsed().as_secs_f64();
println!("{thpt} msg/s");
count = 0;
start = std::time::Instant::now();
}
}
}
}

#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "10")]
/// Number of throughput measurements.
samples: usize,
/// express for sending data
#[arg(long, default_value = "false")]
express: bool,
/// Priority for sending data
#[arg(short, long)]
priority: Option<u8>,
/// Print the statistics
#[arg(short = 't', long)]
print: bool,
/// Number of messages in each throughput measurements
#[arg(short, long, default_value = "10000000")]
number: usize,
#[arg(short, long, default_value = "any")]
allowed_destination: Locality,
/// Sets the size of the payload to publish
#[arg(long, default_value = "8")]
payload_size: usize,
#[command(flatten)]
common: CommonArgs,
}
9 changes: 9 additions & 0 deletions examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ fn main() {
.congestion_control(CongestionControl::Block)
.priority(prio)
.express(args.express)
.allowed_destination({
match args.remote {
true => zenoh::sample::Locality::Remote,
false => zenoh::sample::Locality::Any,
}
})
.wait()
.unwrap();

Expand All @@ -70,6 +76,9 @@ fn main() {

#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
/// remote-only locality for sending data
#[arg(short, long, default_value = "false")]
remote: bool,
/// express for sending data
#[arg(long, default_value = "false")]
express: bool,
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ tokio-util = { workspace = true }
ahash = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = ["derive"] }
flume = { workspace = true }
futures = { workspace = true }
git-version = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/api/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ pub type SourceSn = u32;

/// The locality of samples to be received by subscribers or targeted by publishers.
#[zenoh_macros::unstable]
#[derive(Clone, Copy, Debug, Default, Serialize, PartialEq, Eq)]
#[derive(Clone, Copy, clap::ValueEnum, Hash, Debug, Default, Serialize, PartialEq, Eq)]
pub enum Locality {
SessionLocal,
Remote,
#[default]
Any,
}
#[cfg(not(feature = "unstable"))]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, clap::ValueEnum, Hash, Default, PartialEq, Eq)]
pub(crate) enum Locality {
SessionLocal,
Remote,
Expand Down
77 changes: 56 additions & 21 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ use crate::{
Id,
},
net::{
primitives::Primitives,
primitives::{OptPrimitives, Primitives},
routing::dispatcher::face::Face,
runtime::{Runtime, RuntimeBuilder},
},
Expand Down Expand Up @@ -1928,22 +1928,24 @@ impl SessionInner {
}
};
drop(state);
let mut sample = info.clone().into_sample(
// SAFETY: the keyexpr is valid
unsafe { KeyExpr::from_str_unchecked("dummy") },
payload,
#[cfg(feature = "unstable")]
reliability,
attachment,
);
let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter();
for (cb, key_expr) in drain {
sample.key_expr = key_expr;
cb.call(sample.clone());
}
if let Some((cb, key_expr)) = last {
sample.key_expr = key_expr;
cb.call(sample);
if !callbacks.is_empty() {
let mut sample = info.clone().into_sample(
// SAFETY: the keyexpr is valid
unsafe { KeyExpr::from_str_unchecked("dummy") },
payload,
#[cfg(feature = "unstable")]
reliability,
attachment,
);
let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter();
for (cb, key_expr) in drain {
sample.key_expr = key_expr;
cb.call(sample.clone());
}
if let Some((cb, key_expr)) = last {
sample.key_expr = key_expr;
cb.call(sample);
}
}
}

Expand All @@ -1964,12 +1966,44 @@ impl SessionInner {
attachment: Option<ZBytes>,
) -> ZResult<()> {
trace!("write({:?}, [...])", key_expr);
let primitives = zread!(self.state).primitives()?;
let timestamp = timestamp.or_else(|| self.runtime.new_timestamp());
let wire_expr = key_expr.to_wire(self);
if destination != Locality::SessionLocal {
primitives.send_push(
Push {
let primitives = zread!(self.state).primitives()?;
primitives.opt_send_push(
&wire_expr,
|| { (
push::ext::QoSType::new(
priority.into(),
congestion_control,
is_express,
),
match kind {
SampleKind::Put => PushBody::Put(Put {
timestamp,
encoding: encoding.clone().into(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
payload: payload.clone().into(),
}),
SampleKind::Delete => PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
}),
},
)
/*Push {
wire_expr: wire_expr.to_owned(),
ext_qos: push::ext::QoSType::new(
priority.into(),
Expand Down Expand Up @@ -2002,7 +2036,8 @@ impl SessionInner {
ext_unknown: vec![],
}),
},
},
}*/
},
#[cfg(feature = "unstable")]
reliability,
#[cfg(not(feature = "unstable"))]
Expand Down
8 changes: 6 additions & 2 deletions zenoh/src/net/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ use std::any::Any;
pub use demux::*;
pub use mux::*;
use zenoh_protocol::{
core::Reliability,
network::{interest::Interest, Declare, Push, Request, Response, ResponseFinal},
core::{Reliability, WireExpr},
network::{interest::Interest, push, Declare, Push, Request, Response, ResponseFinal}, zenoh::PushBody,
};

use super::routing::RoutingContext;

pub trait OptPrimitives: Send + Sync {
fn opt_send_push<F: FnOnce()->(push::ext::QoSType, PushBody)>(&self,wire_expr: &WireExpr<'_>, fn_msg: F, reliability: Reliability);
}

pub trait Primitives: Send + Sync {
fn send_interest(&self, msg: Interest);

Expand Down
14 changes: 10 additions & 4 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ use tokio_util::sync::CancellationToken;
use zenoh_protocol::{
core::{ExprId, Reliability, WhatAmI, ZenohIdProto},
network::{
interest::{InterestId, InterestMode, InterestOptions},
Mapping, Push, Request, RequestId, Response, ResponseFinal,
interest::{InterestId, InterestMode, InterestOptions}, push, Mapping, Push, Request, RequestId, Response, ResponseFinal
},
zenoh::RequestBody,
zenoh::{PushBody, RequestBody},
};
use zenoh_sync::get_mut_unchecked;
use zenoh_task::TaskController;
Expand All @@ -43,7 +42,7 @@ use super::{
use crate::{
api::key_expr::KeyExpr,
net::{
primitives::{McastMux, Mux, Primitives},
primitives::{McastMux, Mux, OptPrimitives, Primitives},
routing::{
dispatcher::interests::finalize_pending_interests,
interceptor::{InterceptorTrait, InterceptorsChain},
Expand Down Expand Up @@ -215,6 +214,13 @@ impl Face {
}
}

impl OptPrimitives for Face {
#[inline]
fn opt_send_push<F: FnOnce()->(push::ext::QoSType, PushBody)>(&self,wire_expr: &zenoh_protocol::core::WireExpr<'_>, fn_msg: F, reliability: Reliability) {
opt_route_data(&self.tables, &self.state, wire_expr, fn_msg, reliability);
}
}

impl Primitives for Face {
fn send_interest(&self, msg: zenoh_protocol::network::Interest) {
let ctrl_lock = zlock!(self.tables.ctrl_lock);
Expand Down
Loading
Loading