Skip to content

Commit

Permalink
Add regression test for issue #1470
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Sep 30, 2024
1 parent e79c800 commit 0a74ded
Showing 1 changed file with 181 additions and 0 deletions.
181 changes: 181 additions & 0 deletions zenoh/tests/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4544,3 +4544,184 @@ async fn test_liveliness_regression_3() {
peer_sub.close().await.unwrap();
router.close().await.unwrap();
}

#[cfg(feature = "unstable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_liveliness_issue_1470() {
// https://github.com/eclipse-zenoh/zenoh/issues/1470
use std::{collections::HashSet, str::FromStr, time::Duration};

use zenoh::sample::SampleKind;
use zenoh_config::{WhatAmI, ZenohId};
use zenoh_link::EndPoint;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const ROUTER0_ENDPOINT: &str = "tcp/localhost:47600";
const ROUTER1_ENDPOINT: &str = "tcp/localhost:47601";
const PEER_ENDPOINT: &str = "tcp/localhost:47602";
const LIVELINESS_KEYEXPR_PREFIX: &str = "test/liveliness/issue/1470/*";
const LIVELINESS_KEYEXPR_ROUTER0: &str = "test/liveliness/issue/1470/a0";
const LIVELINESS_KEYEXPR_ROUTER1: &str = "test/liveliness/issue/1470/a1";
const LIVELINESS_KEYEXPR_PEER: &str = "test/liveliness/issue/1470/b";

zenoh_util::init_log_from_env_or("error");

let router0 = {
let mut c = zenoh::Config::default();
c.set_id(ZenohId::from_str("a0").unwrap()).unwrap();
c.listen
.endpoints
.set(vec![ROUTER0_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Router));
ztimeout!(zenoh::open(c)).unwrap()
};

let _token_a0 = ztimeout!(router0
.liveliness()
.declare_token(LIVELINESS_KEYEXPR_ROUTER0))
.unwrap();
tokio::time::sleep(SLEEP).await;

let router1 = {
let mut c = zenoh::Config::default();
c.set_id(ZenohId::from_str("a1").unwrap()).unwrap();
c.listen
.endpoints
.set(vec![ROUTER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.connect
.endpoints
.set(vec![ROUTER0_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Router));
ztimeout!(zenoh::open(c)).unwrap()
};

let _token_a1 = ztimeout!(router1
.liveliness()
.declare_token(LIVELINESS_KEYEXPR_ROUTER1))
.unwrap();
tokio::time::sleep(SLEEP).await;

let peer = {
let mut c = zenoh::Config::default();
c.set_id(ZenohId::from_str("b").unwrap()).unwrap();
c.listen
.endpoints
.set(vec![PEER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.connect
.endpoints
.set(vec![
ROUTER0_ENDPOINT.parse::<EndPoint>().unwrap(),
ROUTER1_ENDPOINT.parse::<EndPoint>().unwrap(),
])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
ztimeout!(zenoh::open(c)).unwrap()
};

let _token_b = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR_PEER)).unwrap();
tokio::time::sleep(SLEEP).await;

let client0 = {
let mut c = zenoh::Config::default();
c.set_id(ZenohId::from_str("c0").unwrap()).unwrap();
c.connect
.endpoints
.set(vec![PEER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
ztimeout!(zenoh::open(c)).unwrap()
};

let sub0 = ztimeout!(client0
.liveliness()
.declare_subscriber(LIVELINESS_KEYEXPR_PREFIX)
.history(true))
.unwrap();
tokio::time::sleep(SLEEP).await;

let mut puts0 = HashSet::new();

let sample = ztimeout!(sub0.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
puts0.insert(sample.key_expr().to_string());

let sample = ztimeout!(sub0.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
puts0.insert(sample.key_expr().to_string());

let sample = ztimeout!(sub0.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
puts0.insert(sample.key_expr().to_string());

assert!(sub0.try_recv().is_err());

assert_eq!(
puts0,
HashSet::from([
LIVELINESS_KEYEXPR_ROUTER0.to_string(),
LIVELINESS_KEYEXPR_ROUTER1.to_string(),
LIVELINESS_KEYEXPR_PEER.to_string(),
])
);

client0.close().await.unwrap();

let client1 = {
let mut c = zenoh::Config::default();
c.set_id(ZenohId::from_str("c1").unwrap()).unwrap();
c.connect
.endpoints
.set(vec![PEER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
ztimeout!(zenoh::open(c)).unwrap()
};

let sub1 = ztimeout!(client1
.liveliness()
.declare_subscriber(LIVELINESS_KEYEXPR_PREFIX)
.history(true))
.unwrap();
tokio::time::sleep(SLEEP).await;

let mut puts1 = HashSet::new();

let sample = ztimeout!(sub1.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
puts1.insert(sample.key_expr().to_string());

let sample = ztimeout!(sub1.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
puts1.insert(sample.key_expr().to_string());

let sample = ztimeout!(sub1.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
puts1.insert(sample.key_expr().to_string());

assert!(sub1.try_recv().is_err());

assert_eq!(
puts1,
HashSet::from([
LIVELINESS_KEYEXPR_ROUTER0.to_string(),
LIVELINESS_KEYEXPR_ROUTER1.to_string(),
LIVELINESS_KEYEXPR_PEER.to_string(),
])
);

router0.close().await.unwrap();
router1.close().await.unwrap();
peer.close().await.unwrap();
client0.close().await.unwrap();
client1.close().await.unwrap();
}

0 comments on commit 0a74ded

Please sign in to comment.