diff --git a/zenoh/tests/acl.rs b/zenoh/tests/acl.rs index 495c3f67a7..5cbd794d4f 100644 --- a/zenoh/tests/acl.rs +++ b/zenoh/tests/acl.rs @@ -23,6 +23,7 @@ mod test { config, config::{EndPoint, WhatAmI}, prelude::*, + sample::SampleKind, Config, Session, }; use zenoh_core::{zlock, ztimeout}; @@ -114,12 +115,20 @@ mod test { { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + let temp_recv_value = received_value.clone(); + let deleted_clone = deleted.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().deserialize::().unwrap(); + if sample.kind() == SampleKind::Put { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } }) .await .unwrap(); @@ -128,6 +137,10 @@ mod test { publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_ne!(*zlock!(deleted), true); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; @@ -156,22 +169,32 @@ mod test { { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().deserialize::().unwrap(); - })) + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); + publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_eq!(*zlock!(deleted), true); ztimeout!(subscriber.undeclare()).unwrap(); } @@ -196,6 +219,7 @@ mod test { "flows": ["egress", "ingress"], "messages": [ "put", + "delete", "declare_subscriber" ], "key_exprs": [ @@ -227,22 +251,32 @@ mod test { { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().deserialize::().unwrap(); - })) + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); + publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; - assert_ne!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_ne!(*zlock!(deleted), true); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; @@ -266,6 +300,7 @@ mod test { "flows": ["egress", "ingress"], "messages": [ "put", + "delete", "declare_subscriber" ], "key_exprs": [ @@ -297,22 +332,32 @@ mod test { { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().deserialize::().unwrap(); - })) + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); + publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_eq!(*zlock!(deleted), true); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await;