Skip to content

Commit

Permalink
Add downsampling query/reply tests
Browse files Browse the repository at this point in the history
  • Loading branch information
oteffahi committed Feb 27, 2025
1 parent 256379b commit 7d11b72
Showing 1 changed file with 131 additions and 13 deletions.
144 changes: 131 additions & 13 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
},
};

use zenoh::{key_expr::KeyExpr, Config, Wait};
use zenoh::{key_expr::KeyExpr, query::ConsolidationMode, Config, Wait};
use zenoh_config::{
DownsamplingItemConf, DownsamplingMessage, DownsamplingRuleConf, InterceptorFlow,
};
Expand All @@ -42,40 +42,40 @@ fn build_config(
ds_config: Vec<DownsamplingItemConf>,
flow: InterceptorFlow,
) -> (Config, Config) {
let mut pub_config = Config::default();
pub_config
let mut sender_config = Config::default();
sender_config
.scouting
.multicast
.set_enabled(Some(false))
.unwrap();

let mut sub_config = Config::default();
sub_config
let mut receiver_config = Config::default();
receiver_config
.scouting
.multicast
.set_enabled(Some(false))
.unwrap();

sub_config
receiver_config
.listen
.endpoints
.set(vec![locator.parse().unwrap()])
.unwrap();
pub_config
sender_config
.connect
.endpoints
.set(vec![locator.parse().unwrap()])
.unwrap();

match flow {
InterceptorFlow::Egress => pub_config.set_downsampling(ds_config).unwrap(),
InterceptorFlow::Ingress => sub_config.set_downsampling(ds_config).unwrap(),
InterceptorFlow::Egress => sender_config.set_downsampling(ds_config).unwrap(),
InterceptorFlow::Ingress => receiver_config.set_downsampling(ds_config).unwrap(),
};

(pub_config, sub_config)
(sender_config, receiver_config)
}

fn downsampling_test<F>(
fn downsampling_pub_sub_test<F>(
pub_config: Config,
sub_config: Config,
ke_prefix: &str,
Expand Down Expand Up @@ -189,7 +189,7 @@ fn downsampling_by_keyexpr_impl(flow: InterceptorFlow) {

let (pub_config, sub_config) = build_config(locator, vec![ds_config], flow);

downsampling_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check);
downsampling_pub_sub_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check);
}

#[test]
Expand Down Expand Up @@ -245,7 +245,7 @@ fn downsampling_by_interface_impl(flow: InterceptorFlow) {

let (pub_config, sub_config) = build_config(locator, ds_config, flow);

downsampling_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check);
downsampling_pub_sub_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check);
}

#[cfg(unix)]
Expand Down Expand Up @@ -317,3 +317,121 @@ fn downsampling_config_error_repeated_id() {

zenoh::open(config).wait().unwrap();
}

fn downsampling_query_reply_test(
query_config: Config,
queryable_config: Config,
queryable_ke: &str,
reply_counter: Arc<AtomicUsize>,
nb_queries: usize,
) {
let query_session = zenoh::open(query_config).wait().unwrap();
let queryable_session = zenoh::open(queryable_config).wait().unwrap();

let response_ke = queryable_ke.to_owned();
queryable_session
.declare_queryable(queryable_ke)
.callback(move |query| {
query.reply(&response_ke, "".as_bytes()).wait().unwrap();
})
.background()
.wait()
.unwrap();

std::thread::sleep(std::time::Duration::from_secs(1));

let mut handles = Vec::new();
for _ in 0..nb_queries {
let queryable_ke = queryable_ke.to_owned();
let query_session = query_session.clone();
let reply_counter = reply_counter.clone();
let handle = std::thread::spawn(move || {
let query = query_session
.get(&queryable_ke)
.consolidation(ConsolidationMode::None)
.timeout(std::time::Duration::from_secs(5))
.wait()
.unwrap();
while let Ok(reply) = query.recv() {
if let Ok(_) = reply.into_result() {
reply_counter.fetch_add(1, Ordering::SeqCst);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
query_session.close().wait().unwrap();
queryable_session.close().wait().unwrap();
}

fn downsampling_query_rate_test(flow: InterceptorFlow) {
let queryable_ke = "test/downsamples_query";
let locator = "tcp/127.0.0.1:31448";

let ds_config = DownsamplingItemConf {
id: None,
flows: Some(vec![flow]),
interfaces: None,
messages: vec![DownsamplingMessage::Query],
rules: vec![DownsamplingRuleConf {
key_expr: queryable_ke.try_into().unwrap(),
freq: 0.01,
}],
};

let (query_config, queryable_config) = build_config(locator, vec![ds_config], flow);
let reply_counter = Arc::new(AtomicUsize::new(0));
downsampling_query_reply_test(
query_config,
queryable_config,
queryable_ke,
reply_counter.clone(),
2,
);
assert!(reply_counter.load(Ordering::SeqCst) == 1);
}

fn downsampling_reply_rate_test(flow: InterceptorFlow) {
let queryable_ke = "test/downsamples_reply";
let locator = "tcp/127.0.0.1:31449";

let ds_config = DownsamplingItemConf {
id: None,
flows: Some(vec![flow]),
interfaces: None,
messages: vec![DownsamplingMessage::Reply],
rules: vec![DownsamplingRuleConf {
key_expr: queryable_ke.try_into().unwrap(),
freq: 0.01,
}],
};

let (queryable_config, query_config) = build_config(locator, vec![ds_config], flow);
let reply_counter = Arc::new(AtomicUsize::new(0));
downsampling_query_reply_test(
query_config,
queryable_config,
queryable_ke,
reply_counter.clone(),
2,
);

assert!(reply_counter.load(Ordering::SeqCst) == 1);
}

#[test]
fn downsampling_query_test() {
zenoh::init_log_from_env_or("error");
downsampling_query_rate_test(InterceptorFlow::Ingress);
downsampling_query_rate_test(InterceptorFlow::Egress);
}

#[test]
fn downsampling_reply_test() {
zenoh::init_log_from_env_or("error");
downsampling_reply_rate_test(InterceptorFlow::Ingress);
downsampling_reply_rate_test(InterceptorFlow::Egress);
}

0 comments on commit 7d11b72

Please sign in to comment.