diff --git a/commons/zenoh-shm/src/api/provider/shm_provider.rs b/commons/zenoh-shm/src/api/provider/shm_provider.rs index fec329776..70081d0b9 100644 --- a/commons/zenoh-shm/src/api/provider/shm_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider.rs @@ -363,7 +363,7 @@ where } /// Deallocating policy. -/// Forcely deallocate up to N buffers until allocation succeeds. +/// Forcibly deallocate up to N buffers until allocation succeeds. #[zenoh_macros::unstable_doc] pub struct Deallocate< const N: usize, diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index ec17f463f..36f2068db 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -151,7 +151,7 @@ impl ShmBufInner { /// # Safety /// You should understand what you are doing, as overestimation /// of the reference counter can lead to memory being stalled until - /// recovered by watchdog subsystem or forcely deallocated + /// recovered by watchdog subsystem or forcibly deallocated pub unsafe fn inc_ref_count(&self) { self.header.header().refcount.fetch_add(1, Ordering::SeqCst); } diff --git a/zenoh/tests/acl.rs b/zenoh/tests/acl.rs index 68293f4ad..8987e683e 100644 --- a/zenoh/tests/acl.rs +++ b/zenoh/tests/acl.rs @@ -12,238 +12,237 @@ // ZettaScale Zenoh Team, // -#![cfg(feature = "internal_config")] +#![cfg(all(feature = "unstable", feature = "internal_config"))] #![cfg(target_family = "unix")] -mod test { - use std::{ - sync::{atomic::AtomicBool, Arc, Mutex}, - time::Duration, - }; - - use tokio::runtime::Handle; - use zenoh::{config::WhatAmI, sample::SampleKind, Config, Session}; - use zenoh_config::{EndPoint, ModeDependentValue}; - use zenoh_core::{zlock, ztimeout}; - - const TIMEOUT: Duration = Duration::from_secs(60); - const SLEEP: Duration = Duration::from_secs(1); - const KEY_EXPR: &str = "test/demo"; - const VALUE: &str = "zenoh"; - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_acl_pub_sub() { - zenoh::init_log_from_env_or("error"); - test_pub_sub_deny(27447).await; - test_pub_sub_allow(27447).await; - test_pub_sub_deny_then_allow(27447).await; - test_pub_sub_allow_then_deny(27447).await; - } +use std::{ + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Duration, +}; + +use tokio::runtime::Handle; +use zenoh::{config::WhatAmI, sample::SampleKind, Config, Session}; +use zenoh_config::{EndPoint, ModeDependentValue}; +use zenoh_core::{zlock, ztimeout}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); +const KEY_EXPR: &str = "test/demo"; +const VALUE: &str = "zenoh"; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_acl_pub_sub() { + zenoh::init_log_from_env_or("error"); + test_pub_sub_deny(27447).await; + test_pub_sub_allow(27447).await; + test_pub_sub_deny_then_allow(27447).await; + test_pub_sub_allow_then_deny(27447).await; +} - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_acl_get_queryable() { - zenoh::init_log_from_env_or("error"); - test_get_qbl_deny(27448).await; - test_get_qbl_allow(27448).await; - test_get_qbl_allow_then_deny(27448).await; - test_get_qbl_deny_then_allow(27448).await; - } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_acl_get_queryable() { + zenoh::init_log_from_env_or("error"); + test_get_qbl_deny(27448).await; + test_get_qbl_allow(27448).await; + test_get_qbl_allow_then_deny(27448).await; + test_get_qbl_deny_then_allow(27448).await; +} - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_acl_queryable_reply() { - zenoh::init_log_from_env_or("error"); - // Only test cases not covered by `test_acl_get_queryable` - test_reply_deny(27449).await; - test_reply_allow_then_deny(27449).await; - } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_acl_queryable_reply() { + zenoh::init_log_from_env_or("error"); + // Only test cases not covered by `test_acl_get_queryable` + test_reply_deny(27449).await; + test_reply_allow_then_deny(27449).await; +} - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_acl_liveliness() { - zenoh::init_log_from_env_or("error"); +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_acl_liveliness() { + zenoh::init_log_from_env_or("error"); - test_liveliness_allow(27450).await; - test_liveliness_deny(27450).await; + test_liveliness_allow(27450).await; + test_liveliness_deny(27450).await; - test_liveliness_allow_deny_token(27450).await; - test_liveliness_deny_allow_token(27450).await; + test_liveliness_allow_deny_token(27450).await; + test_liveliness_deny_allow_token(27450).await; - test_liveliness_allow_deny_sub(27450).await; - test_liveliness_deny_allow_sub(27450).await; + test_liveliness_allow_deny_sub(27450).await; + test_liveliness_deny_allow_sub(27450).await; - test_liveliness_allow_deny_query(27450).await; - test_liveliness_deny_allow_query(27450).await; - } - - async fn get_basic_router_config(port: u16) -> Config { - let mut config = Config::default(); - config.set_mode(Some(WhatAmI::Router)).unwrap(); - config - .listen - .endpoints - .set(vec![format!("tcp/127.0.0.1:{port}").parse().unwrap()]) - .unwrap(); - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - config - } + test_liveliness_allow_deny_query(27450).await; + test_liveliness_deny_allow_query(27450).await; +} - async fn close_router_session(s: Session) { - println!("Closing router session"); - ztimeout!(s.close()).unwrap(); - } +async fn get_basic_router_config(port: u16) -> Config { + let mut config = Config::default(); + config.set_mode(Some(WhatAmI::Router)).unwrap(); + config + .listen + .endpoints + .set(vec![format!("tcp/127.0.0.1:{port}").parse().unwrap()]) + .unwrap(); + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + config +} - async fn get_client_sessions(port: u16) -> (Session, Session) { - println!("Opening client sessions"); - let mut config = zenoh::Config::default(); - config.set_mode(Some(WhatAmI::Client)).unwrap(); - config - .connect - .set_endpoints(ModeDependentValue::Unique(vec![format!( - "tcp/127.0.0.1:{port}" - ) - .parse::() - .unwrap()])) - .unwrap(); +async fn close_router_session(s: Session) { + println!("Closing router session"); + ztimeout!(s.close()).unwrap(); +} - let s01 = ztimeout!(zenoh::open(config)).unwrap(); - - let mut config = zenoh::Config::default(); - config.set_mode(Some(WhatAmI::Client)).unwrap(); - config - .connect - .set_endpoints(ModeDependentValue::Unique(vec![format!( - "tcp/127.0.0.1:{port}" - ) - .parse::() - .unwrap()])) - .unwrap(); - let s02 = ztimeout!(zenoh::open(config)).unwrap(); - (s01, s02) - } +async fn get_client_sessions(port: u16) -> (Session, Session) { + println!("Opening client sessions"); + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "tcp/127.0.0.1:{port}" + ) + .parse::() + .unwrap()])) + .unwrap(); + + let s01 = ztimeout!(zenoh::open(config)).unwrap(); + + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "tcp/127.0.0.1:{port}" + ) + .parse::() + .unwrap()])) + .unwrap(); + let s02 = ztimeout!(zenoh::open(config)).unwrap(); + (s01, s02) +} - async fn close_sessions(s01: Session, s02: Session) { - println!("Closing client sessions"); - ztimeout!(s01.close()).unwrap(); - ztimeout!(s02.close()).unwrap(); - } +async fn close_sessions(s01: Session, s02: Session) { + println!("Closing client sessions"); + ztimeout!(s01.close()).unwrap(); + ztimeout!(s02.close()).unwrap(); +} - async fn test_pub_sub_deny(port: u16) { - println!("test_pub_sub_deny"); +async fn test_pub_sub_deny(port: u16) { + println!("test_pub_sub_deny"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [], "subjects": [], "policies": [], }"#, - ) + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (sub_session, pub_session) = get_client_sessions(port).await; + { + let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + + let temp_recv_value = received_value.clone(); + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().try_to_string().unwrap().into_owned(); + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (sub_session, pub_session) = get_client_sessions(port).await; - { - let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let deleted = Arc::new(Mutex::new(false)); - - let temp_recv_value = received_value.clone(); - let deleted_clone = deleted.clone(); - let subscriber = sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().try_to_string().unwrap().into_owned(); - } else if sample.kind() == SampleKind::Delete { - let mut deleted = zlock!(deleted_clone); - *deleted = true; - } - }) - .await - .unwrap(); - - tokio::time::sleep(SLEEP).await; - publisher.put(VALUE).await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert_ne!(*zlock!(received_value), VALUE); - - publisher.delete().await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!(*zlock!(deleted))); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; + + tokio::time::sleep(SLEEP).await; + publisher.put(VALUE).await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_ne!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!(*zlock!(deleted))); + ztimeout!(subscriber.undeclare()).unwrap(); } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; +} - async fn test_pub_sub_allow(port: u16) { - println!("test_pub_sub_allow"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ +async fn test_pub_sub_allow(port: u16) { + println!("test_pub_sub_allow"); + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [], "subjects": [], "policies": [], }"#, - ) + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (sub_session, pub_session) = get_client_sessions(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + + let temp_recv_value = received_value.clone(); + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().try_to_string().unwrap().into_owned(); + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions(port).await; - { - let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let deleted = Arc::new(Mutex::new(false)); - - let temp_recv_value = received_value.clone(); - let deleted_clone = deleted.clone(); - let subscriber = sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().try_to_string().unwrap().into_owned(); - } else if sample.kind() == SampleKind::Delete { - let mut deleted = zlock!(deleted_clone); - *deleted = true; - } - }) - .await - .unwrap(); - - tokio::time::sleep(SLEEP).await; - publisher.put(VALUE).await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); - - publisher.delete().await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(*zlock!(deleted)); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; + tokio::time::sleep(SLEEP).await; + publisher.put(VALUE).await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_eq!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(*zlock!(deleted)); + ztimeout!(subscriber.undeclare()).unwrap(); } - async fn test_pub_sub_allow_then_deny(port: u16) { - println!("test_pub_sub_allow_then_deny"); + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; +} + +async fn test_pub_sub_allow_then_deny(port: u16) { + println!("test_pub_sub_allow_then_deny"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [ @@ -276,55 +275,55 @@ mod test { } ] }"#, - ) + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (sub_session, pub_session) = get_client_sessions(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + + let temp_recv_value = received_value.clone(); + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().try_to_string().unwrap().into_owned(); + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions(port).await; - { - let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let deleted = Arc::new(Mutex::new(false)); - - let temp_recv_value = received_value.clone(); - let deleted_clone = deleted.clone(); - let subscriber = sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().try_to_string().unwrap().into_owned(); - } else if sample.kind() == SampleKind::Delete { - let mut deleted = zlock!(deleted_clone); - *deleted = true; - } - }) - .await - .unwrap(); - - tokio::time::sleep(SLEEP).await; - publisher.put(VALUE).await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert_ne!(*zlock!(received_value), VALUE); - - publisher.delete().await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!(*zlock!(deleted))); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; + + tokio::time::sleep(SLEEP).await; + publisher.put(VALUE).await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_ne!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!(*zlock!(deleted))); + ztimeout!(subscriber.undeclare()).unwrap(); } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; +} - async fn test_pub_sub_deny_then_allow(port: u16) { - println!("test_pub_sub_deny_then_allow"); +async fn test_pub_sub_deny_then_allow(port: u16) { + println!("test_pub_sub_deny_then_allow"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [ @@ -357,55 +356,55 @@ mod test { } ] }"#, - ) + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (sub_session, pub_session) = get_client_sessions(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + + let temp_recv_value = received_value.clone(); + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().try_to_string().unwrap().into_owned(); + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions(port).await; - { - let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let deleted = Arc::new(Mutex::new(false)); - - let temp_recv_value = received_value.clone(); - let deleted_clone = deleted.clone(); - let subscriber = sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().try_to_string().unwrap().into_owned(); - } else if sample.kind() == SampleKind::Delete { - let mut deleted = zlock!(deleted_clone); - *deleted = true; - } - }) - .await - .unwrap(); - - tokio::time::sleep(SLEEP).await; - publisher.put(VALUE).await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); - - publisher.delete().await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(*zlock!(deleted)); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; + + tokio::time::sleep(SLEEP).await; + publisher.put(VALUE).await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_eq!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(*zlock!(deleted)); + ztimeout!(subscriber.undeclare()).unwrap(); } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; +} - async fn test_get_qbl_deny(port: u16) { - println!("test_get_qbl_deny"); +async fn test_get_qbl_deny(port: u16) { + println!("test_get_qbl_deny"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [ @@ -427,108 +426,106 @@ mod test { } ], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); + ) + .unwrap(); + println!("Opening router session"); - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().try_to_string().unwrap().into_owned(); - break; - } - Err(e) => println!("Error : {:?}", e), + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current() + .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().try_to_string().unwrap().into_owned(); + break; } + Err(e) => println!("Error : {:?}", e), } - tokio::time::sleep(SLEEP).await; - assert_ne!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; +} - async fn test_get_qbl_allow(port: u16) { - println!("test_get_qbl_allow"); +async fn test_get_qbl_allow(port: u16) { + println!("test_get_qbl_allow"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [], "subjects": [], "policies": [], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); + ) + .unwrap(); + println!("Opening router session"); - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().try_to_string().unwrap().into_owned(); - break; - } - Err(e) => println!("Error : {:?}", e), + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current() + .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().try_to_string().unwrap().into_owned(); + break; } + Err(e) => println!("Error : {:?}", e), } - tokio::time::sleep(SLEEP).await; - assert_eq!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; + tokio::time::sleep(SLEEP).await; + assert_eq!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; +} - async fn test_get_qbl_deny_then_allow(port: u16) { - println!("test_get_qbl_deny_then_allow"); +async fn test_get_qbl_deny_then_allow(port: u16) { + println!("test_get_qbl_deny_then_allow"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [ @@ -561,55 +558,54 @@ mod test { } ] }"#, - ) - .unwrap(); + ) + .unwrap(); - println!("Opening router session"); + println!("Opening router session"); - let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions(port).await; - { - let mut received_value = String::new(); + let (get_session, qbl_session) = get_client_sessions(port).await; + { + let mut received_value = String::new(); - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current() + .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); + }); + })) + .unwrap(); - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().try_to_string().unwrap().into_owned(); - break; - } - Err(e) => println!("Error : {:?}", e), + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().try_to_string().unwrap().into_owned(); + break; } + Err(e) => println!("Error : {:?}", e), } - tokio::time::sleep(SLEEP).await; - assert_eq!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; + tokio::time::sleep(SLEEP).await; + assert_eq!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; +} - async fn test_get_qbl_allow_then_deny(port: u16) { - println!("test_get_qbl_allow_then_deny"); +async fn test_get_qbl_allow_then_deny(port: u16) { + println!("test_get_qbl_allow_then_deny"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [ @@ -641,54 +637,53 @@ mod test { } ] }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); + ) + .unwrap(); + println!("Opening router session"); - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().try_to_string().unwrap().into_owned(); - break; - } - Err(e) => println!("Error : {:?}", e), + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current() + .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().try_to_string().unwrap().into_owned(); + break; } + Err(e) => println!("Error : {:?}", e), } - tokio::time::sleep(SLEEP).await; - assert_ne!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; +} - async fn test_reply_deny(port: u16) { - println!("test_reply_deny"); +async fn test_reply_deny(port: u16) { + println!("test_reply_deny"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [ @@ -709,54 +704,53 @@ mod test { } ], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); + ) + .unwrap(); + println!("Opening router session"); - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().try_to_string().unwrap().into_owned(); - break; - } - Err(e) => println!("Error : {:?}", e), + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current() + .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().try_to_string().unwrap().into_owned(); + break; } + Err(e) => println!("Error : {:?}", e), } - tokio::time::sleep(SLEEP).await; - assert_ne!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; +} - async fn test_reply_allow_then_deny(port: u16) { - println!("test_reply_allow_then_deny"); +async fn test_reply_allow_then_deny(port: u16) { + println!("test_reply_allow_then_deny"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [ @@ -783,210 +777,209 @@ mod test { } ] }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); + ) + .unwrap(); + println!("Opening router session"); - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().try_to_string().unwrap().into_owned(); - break; - } - Err(e) => println!("Error : {:?}", e), + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current() + .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().try_to_string().unwrap().into_owned(); + break; } + Err(e) => println!("Error : {:?}", e), } - tokio::time::sleep(SLEEP).await; - assert_ne!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; +} - async fn test_liveliness_deny(port: u16) { - println!("test_liveliness_deny"); +async fn test_liveliness_deny(port: u16) { + println!("test_liveliness_deny"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [], "subjects": [], "policies": [], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (reader_session, writer_session) = get_client_sessions(port).await; - - let received_token = Arc::new(AtomicBool::new(false)); - let dropped_token = Arc::new(AtomicBool::new(false)); - let received_token_reply = Arc::new(AtomicBool::new(false)); - - let cloned_received_token = received_token.clone(); - let cloned_dropped_token = dropped_token.clone(); - let cloned_received_token_reply = received_token_reply.clone(); - - let subscriber = reader_session - .liveliness() - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); - } else if sample.kind() == SampleKind::Delete { - cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); - } - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - - // test if sub receives token declaration - let liveliness = writer_session - .liveliness() - .declare_token(KEY_EXPR) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); - - // test if query receives token reply - reader_session - .liveliness() - .get(KEY_EXPR) - .timeout(TIMEOUT) - .callback(move |reply| match reply.result() { - Ok(_) => { - cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => println!("Error : {:?}", e), - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); - - // test if sub receives token undeclaration - ztimeout!(liveliness.undeclare()).unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); - - ztimeout!(subscriber.undeclare()).unwrap(); - close_sessions(reader_session, writer_session).await; - close_router_session(session).await; - } + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; +} - async fn test_liveliness_allow(port: u16) { - println!("test_liveliness_allow"); +async fn test_liveliness_allow(port: u16) { + println!("test_liveliness_allow"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [], "subjects": [], "policies": [], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (reader_session, writer_session) = get_client_sessions(port).await; - - let received_token = Arc::new(AtomicBool::new(false)); - let dropped_token = Arc::new(AtomicBool::new(false)); - let received_token_reply = Arc::new(AtomicBool::new(false)); - - let cloned_received_token = received_token.clone(); - let cloned_dropped_token = dropped_token.clone(); - let cloned_received_token_reply = received_token_reply.clone(); - - let subscriber = reader_session - .liveliness() - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); - } else if sample.kind() == SampleKind::Delete { - cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); - } - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - - // test if sub receives token declaration - let liveliness = writer_session - .liveliness() - .declare_token(KEY_EXPR) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); - - // test if query receives token reply - reader_session - .liveliness() - .get(KEY_EXPR) - .timeout(TIMEOUT) - .callback(move |reply| match reply.result() { - Ok(_) => { - cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => println!("Error : {:?}", e), - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); - - // test if sub receives token undeclaration - ztimeout!(liveliness.undeclare()).unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); - - ztimeout!(subscriber.undeclare()).unwrap(); - close_sessions(reader_session, writer_session).await; - close_router_session(session).await; - } + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; +} - async fn test_liveliness_allow_deny_token(port: u16) { - println!("test_liveliness_allow_deny_token"); +async fn test_liveliness_allow_deny_token(port: u16) { + println!("test_liveliness_allow_deny_token"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [ @@ -1008,78 +1001,78 @@ mod test { }, ], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (reader_session, writer_session) = get_client_sessions(port).await; - - let received_token = Arc::new(AtomicBool::new(false)); - let dropped_token = Arc::new(AtomicBool::new(false)); - let received_token_reply = Arc::new(AtomicBool::new(false)); - - let cloned_received_token = received_token.clone(); - let cloned_dropped_token = dropped_token.clone(); - let cloned_received_token_reply = received_token_reply.clone(); - - let subscriber = reader_session - .liveliness() - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); - } else if sample.kind() == SampleKind::Delete { - cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); - } - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - - // test if sub receives token declaration - let liveliness = writer_session - .liveliness() - .declare_token(KEY_EXPR) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); - - // test if query receives token reply - reader_session - .liveliness() - .get(KEY_EXPR) - .timeout(TIMEOUT) - .callback(move |reply| match reply.result() { - Ok(_) => { - cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => println!("Error : {:?}", e), - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); - - // test if sub receives token undeclaration - ztimeout!(liveliness.undeclare()).unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); - - ztimeout!(subscriber.undeclare()).unwrap(); - close_sessions(reader_session, writer_session).await; - close_router_session(session).await; - } + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; +} - async fn test_liveliness_deny_allow_token(port: u16) { - println!("test_liveliness_deny_allow_token"); +async fn test_liveliness_deny_allow_token(port: u16) { + println!("test_liveliness_deny_allow_token"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [ @@ -1101,78 +1094,78 @@ mod test { }, ], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (reader_session, writer_session) = get_client_sessions(port).await; - - let received_token = Arc::new(AtomicBool::new(false)); - let dropped_token = Arc::new(AtomicBool::new(false)); - let received_token_reply = Arc::new(AtomicBool::new(false)); - - let cloned_received_token = received_token.clone(); - let cloned_dropped_token = dropped_token.clone(); - let cloned_received_token_reply = received_token_reply.clone(); - - let subscriber = reader_session - .liveliness() - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); - } else if sample.kind() == SampleKind::Delete { - cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); - } - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - - // test if sub receives token declaration - let liveliness = writer_session - .liveliness() - .declare_token(KEY_EXPR) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); - - // test if query receives token reply - reader_session - .liveliness() - .get(KEY_EXPR) - .timeout(TIMEOUT) - .callback(move |reply| match reply.result() { - Ok(_) => { - cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => println!("Error : {:?}", e), - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); - - // test if sub receives token undeclaration - ztimeout!(liveliness.undeclare()).unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); - - ztimeout!(subscriber.undeclare()).unwrap(); - close_sessions(reader_session, writer_session).await; - close_router_session(session).await; - } + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; +} - async fn test_liveliness_allow_deny_sub(port: u16) { - println!("test_liveliness_allow_deny_sub"); +async fn test_liveliness_allow_deny_sub(port: u16) { + println!("test_liveliness_allow_deny_sub"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [ @@ -1194,78 +1187,78 @@ mod test { }, ], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (reader_session, writer_session) = get_client_sessions(port).await; - - let received_token = Arc::new(AtomicBool::new(false)); - let dropped_token = Arc::new(AtomicBool::new(false)); - let received_token_reply = Arc::new(AtomicBool::new(false)); - - let cloned_received_token = received_token.clone(); - let cloned_dropped_token = dropped_token.clone(); - let cloned_received_token_reply = received_token_reply.clone(); - - let subscriber = reader_session - .liveliness() - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); - } else if sample.kind() == SampleKind::Delete { - cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); - } - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - - // test if sub receives token declaration - let liveliness = writer_session - .liveliness() - .declare_token(KEY_EXPR) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); - - // test if query receives token reply - reader_session - .liveliness() - .get(KEY_EXPR) - .timeout(TIMEOUT) - .callback(move |reply| match reply.result() { - Ok(_) => { - cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => println!("Error : {:?}", e), - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); - - // test if sub receives token undeclaration - ztimeout!(liveliness.undeclare()).unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); - - ztimeout!(subscriber.undeclare()).unwrap(); - close_sessions(reader_session, writer_session).await; - close_router_session(session).await; - } + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; +} - async fn test_liveliness_deny_allow_sub(port: u16) { - println!("test_liveliness_deny_allow_sub"); +async fn test_liveliness_deny_allow_sub(port: u16) { + println!("test_liveliness_deny_allow_sub"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [ @@ -1287,78 +1280,78 @@ mod test { }, ], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (reader_session, writer_session) = get_client_sessions(port).await; - - let received_token = Arc::new(AtomicBool::new(false)); - let dropped_token = Arc::new(AtomicBool::new(false)); - let received_token_reply = Arc::new(AtomicBool::new(false)); - - let cloned_received_token = received_token.clone(); - let cloned_dropped_token = dropped_token.clone(); - let cloned_received_token_reply = received_token_reply.clone(); - - let subscriber = reader_session - .liveliness() - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); - } else if sample.kind() == SampleKind::Delete { - cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); - } - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - - // test if sub receives token declaration - let liveliness = writer_session - .liveliness() - .declare_token(KEY_EXPR) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); - - // test if query receives token reply - reader_session - .liveliness() - .get(KEY_EXPR) - .timeout(TIMEOUT) - .callback(move |reply| match reply.result() { - Ok(_) => { - cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => println!("Error : {:?}", e), - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); - - // test if sub receives token undeclaration - ztimeout!(liveliness.undeclare()).unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); - - ztimeout!(subscriber.undeclare()).unwrap(); - close_sessions(reader_session, writer_session).await; - close_router_session(session).await; - } + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; +} - async fn test_liveliness_allow_deny_query(port: u16) { - println!("test_liveliness_allow_deny_query"); +async fn test_liveliness_allow_deny_query(port: u16) { + println!("test_liveliness_allow_deny_query"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "allow", "rules": [ @@ -1380,78 +1373,78 @@ mod test { }, ], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (reader_session, writer_session) = get_client_sessions(port).await; - - let received_token = Arc::new(AtomicBool::new(false)); - let dropped_token = Arc::new(AtomicBool::new(false)); - let received_token_reply = Arc::new(AtomicBool::new(false)); - - let cloned_received_token = received_token.clone(); - let cloned_dropped_token = dropped_token.clone(); - let cloned_received_token_reply = received_token_reply.clone(); - - let subscriber = reader_session - .liveliness() - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); - } else if sample.kind() == SampleKind::Delete { - cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); - } - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - - // test if sub receives token declaration - let liveliness = writer_session - .liveliness() - .declare_token(KEY_EXPR) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); - - // test if query receives token reply - reader_session - .liveliness() - .get(KEY_EXPR) - .timeout(TIMEOUT) - .callback(move |reply| match reply.result() { - Ok(_) => { - cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => println!("Error : {:?}", e), - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); - - // test if sub receives token undeclaration - ztimeout!(liveliness.undeclare()).unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); - - ztimeout!(subscriber.undeclare()).unwrap(); - close_sessions(reader_session, writer_session).await; - close_router_session(session).await; - } + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; +} - async fn test_liveliness_deny_allow_query(port: u16) { - println!("test_liveliness_deny_allow_query"); +async fn test_liveliness_deny_allow_query(port: u16) { + println!("test_liveliness_deny_allow_query"); - let mut config_router = get_basic_router_config(port).await; - config_router - .insert_json5( - "access_control", - r#"{ + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ "enabled": true, "default_permission": "deny", "rules": [ @@ -1473,67 +1466,66 @@ mod test { }, ], }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (reader_session, writer_session) = get_client_sessions(port).await; - - let received_token = Arc::new(AtomicBool::new(false)); - let dropped_token = Arc::new(AtomicBool::new(false)); - let received_token_reply = Arc::new(AtomicBool::new(false)); - - let cloned_received_token = received_token.clone(); - let cloned_dropped_token = dropped_token.clone(); - let cloned_received_token_reply = received_token_reply.clone(); - - let subscriber = reader_session - .liveliness() - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - if sample.kind() == SampleKind::Put { - cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); - } else if sample.kind() == SampleKind::Delete { - cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); - } - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - - // test if sub receives token declaration - let liveliness = writer_session - .liveliness() - .declare_token(KEY_EXPR) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); - - // test if query receives token reply - reader_session - .liveliness() - .get(KEY_EXPR) - .timeout(TIMEOUT) - .callback(move |reply| match reply.result() { - Ok(_) => { - cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => println!("Error : {:?}", e), - }) - .await - .unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); - - // test if sub receives token undeclaration - ztimeout!(liveliness.undeclare()).unwrap(); - tokio::time::sleep(SLEEP).await; - assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); - - ztimeout!(subscriber.undeclare()).unwrap(); - close_sessions(reader_session, writer_session).await; - close_router_session(session).await; - } + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (reader_session, writer_session) = get_client_sessions(port).await; + + let received_token = Arc::new(AtomicBool::new(false)); + let dropped_token = Arc::new(AtomicBool::new(false)); + let received_token_reply = Arc::new(AtomicBool::new(false)); + + let cloned_received_token = received_token.clone(); + let cloned_dropped_token = dropped_token.clone(); + let cloned_received_token_reply = received_token_reply.clone(); + + let subscriber = reader_session + .liveliness() + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { + cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); + } else if sample.kind() == SampleKind::Delete { + cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); + } + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + + // test if sub receives token declaration + let liveliness = writer_session + .liveliness() + .declare_token(KEY_EXPR) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); + + // test if query receives token reply + reader_session + .liveliness() + .get(KEY_EXPR) + .timeout(TIMEOUT) + .callback(move |reply| match reply.result() { + Ok(_) => { + cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("Error : {:?}", e), + }) + .await + .unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); + + // test if sub receives token undeclaration + ztimeout!(liveliness.undeclare()).unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); + + ztimeout!(subscriber.undeclare()).unwrap(); + close_sessions(reader_session, writer_session).await; + close_router_session(session).await; }