diff --git a/.gitignore b/.gitignore index 105dae1aa7..bf5a1656d3 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,6 @@ cargo-timing*.html +#ignore test data +testfiles ci/valgrind-check/*.log diff --git a/Cargo.lock b/Cargo.lock index aff6c4950a..91ad98ce8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -235,6 +235,45 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "asn1-rs" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ad1373757efa0f70ec53939aabc7152e1591cb485208052993070ac8d2429d" +dependencies = [ + "asn1-rs-derive", + "asn1-rs-impl", + "displaydoc", + "nom", + "num-traits", + "rusticata-macros", + "thiserror", + "time 0.3.28", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7378575ff571966e99a744addeff0bff98b8ada0dedf1956d59e634db95eaac1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", + "synstructure", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "async-attributes" version = "1.1.2" @@ -1004,6 +1043,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "der-parser" +version = "9.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cd0a5c643689626bec213c4d8bd4d96acc8ffdb4ad4bb6bc16abf27d5f4b553" +dependencies = [ + "asn1-rs", + "displaydoc", + "nom", + "num-bigint", + "num-traits", + "rusticata-macros", +] + [[package]] name = "deranged" version = "0.3.8" @@ -1082,6 +1135,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "displaydoc" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "dyn-clone" version = "1.0.13" @@ -2305,6 +2369,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "oid-registry" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c958dd45046245b9c3c2547369bb634eb461670b2e7e0de552905801a648d1d" +dependencies = [ + "asn1-rs", +] + [[package]] name = "once_cell" version = "1.19.0" @@ -3149,6 +3222,15 @@ dependencies = [ "semver 1.0.18", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom", +] + [[package]] name = "rustix" version = "0.37.25" @@ -3951,6 +4033,17 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -4073,6 +4166,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" dependencies = [ "deranged", + "itoa", "serde", "time-core", "time-macros 0.2.14", @@ -5048,6 +5142,23 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "x509-parser" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcbc162f30700d6f3f82a24bf7cc62ffe7caea42c0b2cba8bf7f3ae50cf51f69" +dependencies = [ + "asn1-rs", + "data-encoding", + "der-parser", + "lazy_static", + "nom", + "oid-registry", + "rusticata-macros", + "thiserror", + "time 0.3.28", +] + [[package]] name = "yasna" version = "0.5.2" @@ -5350,6 +5461,7 @@ dependencies = [ "tokio-util", "tracing", "webpki-roots", + "x509-parser", "zenoh-collections", "zenoh-config", "zenoh-core", @@ -5416,6 +5528,7 @@ dependencies = [ "tokio-util", "tracing", "webpki-roots", + "x509-parser", "zenoh-collections", "zenoh-config", "zenoh-core", diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index fc1ee880b4..bd677d517a 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -104,6 +104,8 @@ pub struct DownsamplingItemConf { #[derive(Serialize, Debug, Deserialize, Clone)] pub struct AclConfigRules { pub interfaces: Option>, + pub cert_common_names: Option>, + pub usernames: Option>, pub key_exprs: Vec, pub actions: Vec, pub flows: Option>, @@ -124,6 +126,8 @@ pub struct PolicyRule { #[serde(rename_all = "snake_case")] pub enum Subject { Interface(String), + CertCommonName(String), + Username(String), } #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)] diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 5a41050e94..6b2ec14c69 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -50,6 +50,7 @@ pub struct Link { pub is_reliable: bool, pub is_streamed: bool, pub interfaces: Vec, + pub auth_identifier: LinkAuthId, } #[async_trait] @@ -78,6 +79,7 @@ impl From<&LinkUnicast> for Link { is_reliable: link.is_reliable(), is_streamed: link.is_streamed(), interfaces: link.get_interface_names(), + auth_identifier: link.get_auth_identifier(), } } } @@ -98,6 +100,7 @@ impl From<&LinkMulticast> for Link { is_reliable: link.is_reliable(), is_streamed: false, interfaces: vec![], + auth_identifier: LinkAuthId::default(), } } } diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index add4c3a27b..cd8c550503 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -20,6 +20,7 @@ use core::{ use std::net::SocketAddr; use async_trait::async_trait; +use serde::Serialize; use zenoh_protocol::{ core::{EndPoint, Locator}, transport::BatchSize, @@ -51,6 +52,7 @@ pub trait LinkUnicastTrait: Send + Sync { fn is_reliable(&self) -> bool; fn is_streamed(&self) -> bool; fn get_interface_names(&self) -> Vec; + fn get_auth_identifier(&self) -> LinkAuthId; async fn write(&self, buffer: &[u8]) -> ZResult; async fn write_all(&self, buffer: &[u8]) -> ZResult<()>; async fn read(&self, buffer: &mut [u8]) -> ZResult; @@ -118,3 +120,69 @@ pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec { } } } +#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] + +pub enum LinkAuthType { + Tls, + Quic, + None, +} +#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] + +pub struct LinkAuthId { + auth_type: LinkAuthType, + auth_value: Option, +} + +impl LinkAuthId { + pub fn get_type(&self) -> &LinkAuthType { + &self.auth_type + } + pub fn get_value(&self) -> &Option { + &self.auth_value + } +} +impl Default for LinkAuthId { + fn default() -> Self { + LinkAuthId { + auth_type: LinkAuthType::None, + auth_value: None, + } + } +} + +#[derive(Debug)] +pub struct LinkAuthIdBuilder { + pub auth_type: LinkAuthType, //HAS to be provided when building + pub auth_value: Option, //actual value added to the above type; is None for None type +} +impl Default for LinkAuthIdBuilder { + fn default() -> Self { + Self::new() + } +} + +impl LinkAuthIdBuilder { + pub fn new() -> LinkAuthIdBuilder { + LinkAuthIdBuilder { + auth_type: LinkAuthType::None, + auth_value: None, + } + } + + pub fn auth_type(&mut self, auth_type: LinkAuthType) -> &mut Self { + self.auth_type = auth_type; + self + } + pub fn auth_value(&mut self, auth_value: Option) -> &mut Self { + self.auth_value = auth_value; + self + } + + pub fn build(&self) -> LinkAuthId { + LinkAuthId { + auth_type: self.auth_type.clone(), + auth_value: self.auth_value.clone(), + } + } +} diff --git a/io/zenoh-links/zenoh-link-quic/Cargo.toml b/io/zenoh-links/zenoh-link-quic/Cargo.toml index 63bfc1f839..e10eed71a1 100644 --- a/io/zenoh-links/zenoh-link-quic/Cargo.toml +++ b/io/zenoh-links/zenoh-link-quic/Cargo.toml @@ -30,13 +30,14 @@ base64 = { workspace = true } futures = { workspace = true } quinn = { workspace = true } rustls-native-certs = { workspace = true } -rustls-pki-types = { workspace = true } +rustls-pki-types = { workspace = true } rustls-webpki = { workspace = true } + secrecy = { workspace = true } tokio = { workspace = true, features = [ - "fs", "io-util", "net", + "fs", "sync", "time", ] } @@ -56,3 +57,5 @@ zenoh-util = { workspace = true } rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } tokio-rustls = "0.24.1" rustls-pemfile = { version = "1" } + +x509-parser = "0.16.0" diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index a3b2687b6f..cd9cad071f 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -22,10 +22,11 @@ use std::{ use async_trait::async_trait; use tokio::sync::Mutex as AsyncMutex; use tokio_util::sync::CancellationToken; +use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ - get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, + get_ip_interface_names, LinkAuthId, LinkAuthIdBuilder, LinkAuthType, LinkManagerUnicastTrait, + LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -46,6 +47,7 @@ pub struct LinkUnicastQuic { dst_locator: Locator, send: AsyncMutex, recv: AsyncMutex, + auth_identifier: LinkAuthId, } impl LinkUnicastQuic { @@ -55,6 +57,7 @@ impl LinkUnicastQuic { dst_locator: Locator, send: quinn::SendStream, recv: quinn::RecvStream, + auth_identifier: LinkAuthId, ) -> LinkUnicastQuic { // Build the Quic object LinkUnicastQuic { @@ -64,6 +67,7 @@ impl LinkUnicastQuic { dst_locator, send: AsyncMutex::new(send), recv: AsyncMutex::new(recv), + auth_identifier, } } } @@ -156,6 +160,10 @@ impl LinkUnicastTrait for LinkUnicastQuic { fn is_streamed(&self) -> bool { true } + #[inline(always)] + fn get_auth_identifier(&self) -> LinkAuthId { + self.auth_identifier.clone() + } } impl Drop for LinkUnicastQuic { @@ -254,6 +262,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { .open_bi() .await .map_err(|e| zerror!("Can not create a new QUIC link bound to {}: {}", host, e))?; + let auth_id = get_cert_common_name(quic_conn.clone())?; let link = Arc::new(LinkUnicastQuic::new( quic_conn, @@ -261,6 +270,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { endpoint.into(), send, recv, + auth_id.into(), )); Ok(LinkUnicast(link)) @@ -388,12 +398,15 @@ async fn accept_task( let dst_addr = quic_conn.remote_address(); tracing::debug!("Accepted QUIC connection on {:?}: {:?}", src_addr, dst_addr); // Create the new link object + let auth_id = get_cert_common_name(quic_conn.clone())?; + let link = Arc::new(LinkUnicastQuic::new( quic_conn, src_addr, Locator::new(QUIC_LOCATOR_PREFIX, dst_addr.to_string(), "")?, send, recv, + auth_id.into() )); // Communicate the new link to the initial transport manager @@ -418,3 +431,36 @@ async fn accept_task( } Ok(()) } + +fn get_cert_common_name(conn: quinn::Connection) -> ZResult { + let mut auth_id = QuicAuthId { auth_value: None }; + if let Some(pi) = conn.peer_identity() { + let serv_certs = pi.downcast::>().unwrap(); + if let Some(item) = serv_certs.iter().next() { + let (_, cert) = X509Certificate::from_der(item.as_ref()).unwrap(); + let subject_name = cert + .subject + .iter_common_name() + .next() + .and_then(|cn| cn.as_str().ok()) + .unwrap(); + auth_id = QuicAuthId { + auth_value: Some(subject_name.to_string()), + }; + } + } + Ok(auth_id) +} + +#[derive(Debug, Clone)] +struct QuicAuthId { + auth_value: Option, +} +impl From for LinkAuthId { + fn from(value: QuicAuthId) -> Self { + LinkAuthIdBuilder::new() + .auth_type(LinkAuthType::Quic) + .auth_value(value.auth_value.clone()) + .build() + } +} diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index ca4efacdc6..31213f5c43 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -32,8 +32,8 @@ use tokio_util::sync::CancellationToken; use z_serial::ZSerial; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ - ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - NewLinkChannelSender, + ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -212,6 +212,10 @@ impl LinkUnicastTrait for LinkUnicastSerial { fn is_streamed(&self) -> bool { false } + #[inline(always)] + fn get_auth_identifier(&self) -> LinkAuthId { + LinkAuthId::default() + } } impl fmt::Display for LinkUnicastSerial { diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 79812c526e..bf2e66c863 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -20,7 +20,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use zenoh_link_commons::{ - get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::{ @@ -164,6 +164,9 @@ impl LinkUnicastTrait for LinkUnicastTcp { fn is_streamed(&self) -> bool { true } + fn get_auth_identifier(&self) -> LinkAuthId { + LinkAuthId::default() + } } // // WARN: This sometimes causes timeout in routing test diff --git a/io/zenoh-links/zenoh-link-tls/Cargo.toml b/io/zenoh-links/zenoh-link-tls/Cargo.toml index 3025e3d7d7..00f7207bb0 100644 --- a/io/zenoh-links/zenoh-link-tls/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tls/Cargo.toml @@ -47,3 +47,5 @@ zenoh-result = { workspace = true } zenoh-runtime = { workspace = true } zenoh-sync = { workspace = true } zenoh-util = { workspace = true } + +x509-parser = "0.16.0" diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 1ced1a26b1..2e40f23dae 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -21,10 +21,12 @@ use tokio::{ }; use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream}; use tokio_util::sync::CancellationToken; +//use webpki::anchor_from_trusted_cert; +use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ - get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, + get_ip_interface_names, LinkAuthId, LinkAuthIdBuilder, LinkAuthType, LinkManagerUnicastTrait, + LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -37,6 +39,10 @@ use crate::{ TLS_ACCEPT_THROTTLE_TIME, TLS_DEFAULT_MTU, TLS_LINGER_TIMEOUT, TLS_LOCATOR_PREFIX, }; +#[derive(Default, Debug, PartialEq, Eq, Hash)] +pub struct TlsCommonName(String); + +//impl pub struct LinkUnicastTls { // The underlying socket as returned from the async-rustls library // NOTE: TlsStream requires &mut for read and write operations. This means @@ -56,6 +62,7 @@ pub struct LinkUnicastTls { // Make sure there are no concurrent read or writes write_mtx: AsyncMutex<()>, read_mtx: AsyncMutex<()>, + auth_identifier: LinkAuthId, } unsafe impl Send for LinkUnicastTls {} @@ -66,6 +73,7 @@ impl LinkUnicastTls { socket: TlsStream, src_addr: SocketAddr, dst_addr: SocketAddr, + auth_identifier: LinkAuthId, ) -> LinkUnicastTls { let (tcp_stream, _) = socket.get_ref(); // Set the TLS nodelay option @@ -99,6 +107,7 @@ impl LinkUnicastTls { dst_locator: Locator::new(TLS_LOCATOR_PREFIX, dst_addr.to_string(), "").unwrap(), write_mtx: AsyncMutex::new(()), read_mtx: AsyncMutex::new(()), + auth_identifier, } } @@ -189,6 +198,10 @@ impl LinkUnicastTrait for LinkUnicastTls { fn is_streamed(&self) -> bool { true } + #[inline(always)] + fn get_auth_identifier(&self) -> LinkAuthId { + self.auth_identifier.clone() + } } impl Drop for LinkUnicastTls { @@ -282,9 +295,19 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { e ) })?; + + let (_, tls_conn) = tls_stream.get_ref(); + + let auth_identifier = get_server_cert_common_name(tls_conn)?; + let tls_stream = TlsStream::Client(tls_stream); - let link = Arc::new(LinkUnicastTls::new(tls_stream, src_addr, dst_addr)); + let link = Arc::new(LinkUnicastTls::new( + tls_stream, + src_addr, + dst_addr, + auth_identifier.into(), + )); Ok(LinkUnicast(link)) } @@ -384,8 +407,16 @@ async fn accept_task( }; tracing::debug!("Accepted TLS connection on {:?}: {:?}", src_addr, dst_addr); - // Create the new link object - let link = Arc::new(LinkUnicastTls::new(tls_stream, src_addr, dst_addr)); + let (_, tls_conn) = tls_stream.get_ref(); + let auth_identifier = get_client_cert_common_name(tls_conn)?; + tracing::debug!("Accepted TLS connection on {:?}: {:?}", src_addr, dst_addr); + // Create the new link object + let link = Arc::new(LinkUnicastTls::new( + tls_stream, + src_addr, + dst_addr, + auth_identifier.into(), + )); // Communicate the new link to the initial transport manager if let Err(e) = manager.send_async(LinkUnicast(link)).await { @@ -409,3 +440,55 @@ async fn accept_task( Ok(()) } + +fn get_client_cert_common_name(tls_conn: &rustls::CommonState) -> ZResult { + if let Some(serv_certs) = tls_conn.peer_certificates() { + let (_, cert) = X509Certificate::from_der(serv_certs[0].as_ref())?; + let subject_name = &cert + .subject + .iter_common_name() + .next() + .and_then(|cn| cn.as_str().ok()) + .unwrap(); + + Ok(TlsAuthId { + auth_value: Some(subject_name.to_string()), + }) + } else { + Ok(TlsAuthId { auth_value: None }) + } +} + +fn get_server_cert_common_name(tls_conn: &rustls::ClientConnection) -> ZResult { + let serv_certs = tls_conn.peer_certificates().unwrap(); + let mut auth_id = TlsAuthId { auth_value: None }; + + //need the first certificate in the chain os no need for looping + if let Some(item) = serv_certs.iter().next() { + let (_, cert) = X509Certificate::from_der(item.as_ref())?; + let subject_name = &cert + .subject + .iter_common_name() + .next() + .and_then(|cn| cn.as_str().ok()) + .unwrap(); + + auth_id = TlsAuthId { + auth_value: Some(subject_name.to_string()), + }; + return Ok(auth_id); + } + Ok(auth_id) +} + +struct TlsAuthId { + auth_value: Option, +} +impl From for LinkAuthId { + fn from(value: TlsAuthId) -> Self { + LinkAuthIdBuilder::new() + .auth_type(LinkAuthType::Tls) + .auth_value(value.auth_value.clone()) + .build() + } +} diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 79f980ca96..760ed2209c 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -24,8 +24,8 @@ use tokio::{net::UdpSocket, sync::Mutex as AsyncMutex}; use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ - get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, + LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -224,6 +224,10 @@ impl LinkUnicastTrait for LinkUnicastUdp { fn is_streamed(&self) -> bool { false } + #[inline(always)] + fn get_auth_identifier(&self) -> LinkAuthId { + LinkAuthId::default() + } } impl fmt::Display for LinkUnicastUdp { diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 1b30ceb553..7dea524ca1 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -36,8 +36,8 @@ use tokio_util::sync::CancellationToken; use unix_named_pipe::{create, open_write}; use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, Wait}; use zenoh_link_commons::{ - ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - NewLinkChannelSender, + ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -525,6 +525,10 @@ impl LinkUnicastTrait for UnicastPipe { fn is_streamed(&self) -> bool { true } + #[inline(always)] + fn get_auth_identifier(&self) -> LinkAuthId { + LinkAuthId::default() + } } impl fmt::Display for UnicastPipe { diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index cc7147c9e0..7adbb3ab30 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -27,7 +27,7 @@ use tokio_util::sync::CancellationToken; use uuid::Uuid; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -143,6 +143,10 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { fn is_streamed(&self) -> bool { true } + #[inline(always)] + fn get_auth_identifier(&self) -> LinkAuthId { + LinkAuthId::default() + } } impl Drop for LinkUnicastUnixSocketStream { diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs index 605f114173..32b292ca7e 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -28,7 +28,7 @@ use tokio_vsock::{ }; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{endpoint::Address, EndPoint, Locator}, @@ -189,6 +189,10 @@ impl LinkUnicastTrait for LinkUnicastVsock { fn is_streamed(&self) -> bool { true } + #[inline(always)] + fn get_auth_identifier(&self) -> LinkAuthId { + LinkAuthId::default() + } } impl fmt::Display for LinkUnicastVsock { diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index b671bf67f2..336e8af975 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -34,7 +34,7 @@ use tokio_tungstenite::{accept_async, tungstenite::Message, MaybeTlsStream, WebS use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -226,6 +226,10 @@ impl LinkUnicastTrait for LinkUnicastWs { fn is_streamed(&self) -> bool { false } + #[inline(always)] + fn get_auth_identifier(&self) -> LinkAuthId { + LinkAuthId::default() + } } impl Drop for LinkUnicastWs { diff --git a/io/zenoh-transport/src/unicast/authentication.rs b/io/zenoh-transport/src/unicast/authentication.rs new file mode 100644 index 0000000000..b66289983e --- /dev/null +++ b/io/zenoh-transport/src/unicast/authentication.rs @@ -0,0 +1,43 @@ +use zenoh_link::{LinkAuthId, LinkAuthType}; + +#[cfg(feature = "auth_usrpwd")] +use super::establishment::ext::auth::UsrPwdId; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum AuthId { + CertCommonName(String), + Username(String), + None, +} + +impl From for AuthId { + fn from(lid: LinkAuthId) -> Self { + match (lid.get_type(), lid.get_value()) { + (LinkAuthType::Tls | LinkAuthType::Quic, Some(auth_value)) => { + AuthId::CertCommonName(auth_value.clone()) + } + _ => AuthId::None, + } + } +} + +#[cfg(feature = "auth_usrpwd")] +impl From for AuthId { + fn from(user_password_id: UsrPwdId) -> Self { + // pub(crate) struct UsrPwdId(pub Option>); + match user_password_id.0 { + Some(username) => { + //do something + //convert username from vecu8 to string + match std::str::from_utf8(&username) { + Ok(name) => AuthId::Username(name.to_owned()), + Err(e) => { + tracing::error!("Error in extracting username {}", e); + AuthId::None + } + } + } + None => AuthId::None, + } + } +} diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index d074ea9642..9a7151252d 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -31,6 +31,8 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; +#[cfg(feature = "auth_usrpwd")] +use super::ext::auth::UsrPwdId; #[cfg(feature = "shared-memory")] use super::ext::shm::AuthSegment; #[cfg(feature = "shared-memory")] @@ -111,6 +113,8 @@ struct RecvOpenSynOut { other_whatami: WhatAmI, other_lease: Duration, other_initial_sn: TransportSn, + #[cfg(feature = "auth_usrpwd")] + other_auth_id: UsrPwdId, } // OpenAck @@ -486,11 +490,18 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } // Extension Auth - #[cfg(feature = "transport_auth")] - self.ext_auth - .recv_open_syn((&mut state.link.ext_auth, open_syn.ext_auth)) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; + #[allow(unused_mut, unused_assignments)] + #[cfg(feature = "auth_usrpwd")] + let mut user_password_id = UsrPwdId(None); + + #[cfg(feature = "auth_usrpwd")] + { + user_password_id = self + .ext_auth + .recv_open_syn((&mut state.link.ext_auth, open_syn.ext_auth)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + } // Extension MultiLink #[cfg(feature = "transport_multilink")] @@ -517,6 +528,8 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { other_whatami: cookie.whatami, other_lease: open_syn.lease, other_initial_sn: open_syn.initial_sn, + #[cfg(feature = "auth_usrpwd")] + other_auth_id: user_password_id, }; Ok((state, output)) } @@ -711,7 +724,6 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - cookie_nonce: iack_out.cookie_nonce, }; let (mut state, osyn_out) = step!(fsm.recv_open_syn(osyn_in).await); - // Create the OpenAck but not send it yet let oack_in = SendOpenAckIn { mine_zid: manager.config.zid, @@ -735,6 +747,8 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - false => None, }, is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), + #[cfg(feature = "auth_usrpwd")] + auth_id: osyn_out.other_auth_id, }; let a_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs b/io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs index 8d57434bc3..0bc46c6edc 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs @@ -571,7 +571,12 @@ impl<'a> AcceptFsm for &'a AuthFsm<'a> { } type RecvOpenSynIn = (&'a mut StateAccept, Option); + + #[cfg(not(feature = "auth_usrpwd"))] type RecvOpenSynOut = (); + #[cfg(feature = "auth_usrpwd")] + type RecvOpenSynOut = UsrPwdId; + async fn recv_open_syn( self, input: Self::RecvOpenSynIn, @@ -604,13 +609,17 @@ impl<'a> AcceptFsm for &'a AuthFsm<'a> { match (self.usrpwd.as_ref(), state.usrpwd.as_mut()) { (Some(e), Some(s)) => { let x = ztake!(exts, id::USRPWD); - e.recv_open_syn((s, ztryinto!(x, S))).await?; + let username = e.recv_open_syn((s, ztryinto!(x, S))).await?; + let user_passwd_id = UsrPwdId(Some(username)); + return Ok(user_passwd_id); + } + (None, None) => { + return Ok(UsrPwdId(None)); } - (None, None) => {} _ => bail!("{S} Invalid UsrPwd configuration."), } } - + #[cfg(not(feature = "auth_usrpwd"))] Ok(()) } diff --git a/io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs b/io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs index be24337fad..22d7a86817 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs @@ -162,6 +162,8 @@ impl StateOpen { pub(crate) struct StateAccept { nonce: u64, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct UsrPwdId(pub Option>); impl StateAccept { pub(crate) fn new(prng: &mut R) -> Self @@ -406,7 +408,7 @@ impl<'a> AcceptFsm for &'a AuthUsrPwdFsm<'a> { } type RecvOpenSynIn = (&'a mut StateAccept, Option); - type RecvOpenSynOut = (); + type RecvOpenSynOut = Vec; //value of userid is returned if recvopensynout is processed as valid async fn recv_open_syn( self, input: Self::RecvOpenSynIn, @@ -436,8 +438,8 @@ impl<'a> AcceptFsm for &'a AuthUsrPwdFsm<'a> { if hmac != open_syn.hmac { bail!("{S} Invalid password."); } - - Ok(()) + let username = open_syn.user.to_owned(); + Ok(username) } type SendOpenAckIn = &'a StateAccept; diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 49c57d9e9a..2d50d465bf 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -32,6 +32,8 @@ use zenoh_result::ZResult; use super::ext::shm::AuthSegment; #[cfg(feature = "shared-memory")] use crate::shm::TransportShmConfig; +#[cfg(feature = "auth_usrpwd")] +use crate::unicast::establishment::ext::auth::UsrPwdId; use crate::{ common::batch::BatchConfig, unicast::{ @@ -644,6 +646,8 @@ pub(crate) async fn open_link( false => None, }, is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), + #[cfg(feature = "auth_usrpwd")] + auth_id: UsrPwdId(None), }; let o_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index 9c46b55174..abffb665b7 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -32,6 +32,7 @@ use zenoh_result::{zerror, ZResult}; use crate::stats::TransportStats; use crate::{ unicast::{ + authentication::AuthId, link::{LinkUnicastWithOpenAck, TransportLinkUnicast}, transport_unicast_inner::{AddLinkResult, TransportUnicastTrait}, TransportConfigUnicast, @@ -187,6 +188,10 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { self.config.zid } + fn get_auth_ids(&self) -> Vec { + vec![] + } + fn get_whatami(&self) -> WhatAmI { self.config.whatami } diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index 1726ba2559..973d0bf09a 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -11,15 +11,15 @@ // Contributors: // ZettaScale Zenoh Team, // +pub mod authentication; pub mod establishment; pub(crate) mod link; pub(crate) mod lowlatency; pub(crate) mod manager; -pub(crate) mod transport_unicast_inner; -pub(crate) mod universal; - #[cfg(feature = "test")] pub mod test_helpers; +pub(crate) mod transport_unicast_inner; +pub(crate) mod universal; use std::{ fmt, @@ -42,6 +42,9 @@ use self::transport_unicast_inner::TransportUnicastTrait; use super::{TransportPeer, TransportPeerEventHandler}; #[cfg(feature = "shared-memory")] use crate::shm::TransportShmConfig; +use crate::unicast::authentication::AuthId; +#[cfg(feature = "auth_usrpwd")] +use crate::unicast::establishment::ext::auth::UsrPwdId; /*************************************/ /* TRANSPORT UNICAST */ @@ -58,6 +61,8 @@ pub(crate) struct TransportConfigUnicast { #[cfg(feature = "shared-memory")] pub(crate) shm: Option, pub(crate) is_lowlatency: bool, + #[cfg(feature = "auth_usrpwd")] + pub(crate) auth_id: UsrPwdId, } /// [`TransportUnicast`] is the transport handler returned @@ -117,6 +122,11 @@ impl TransportUnicast { Ok(transport.get_links()) } + pub fn get_auth_ids(&self) -> ZResult> { + let transport = self.get_inner()?; + Ok(transport.get_auth_ids()) + } + #[inline(always)] pub fn schedule(&self, message: NetworkMessage) -> ZResult<()> { let transport = self.get_inner()?; diff --git a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs index c687a6aa16..bc0c34b7e8 100644 --- a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs +++ b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs @@ -56,6 +56,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { fn get_whatami(&self) -> WhatAmI; fn get_callback(&self) -> Option>; fn get_links(&self) -> Vec; + fn get_auth_ids(&self) -> Vec; #[cfg(feature = "shared-memory")] fn is_shm(&self) -> bool; fn is_qos(&self) -> bool; diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 538756f6ee..e7b0d52458 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -28,6 +28,7 @@ use zenoh_protocol::{ }; use zenoh_result::{bail, zerror, ZResult}; +use super::super::authentication::AuthId; #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::{ @@ -381,6 +382,18 @@ impl TransportUnicastTrait for TransportUnicastUniversal { zread!(self.links).iter().map(|l| l.link.link()).collect() } + fn get_auth_ids(&self) -> Vec { + //convert link level auth ids to AuthId + #[allow(unused_mut)] + let mut auth_ids: Vec = zread!(self.links) + .iter() + .map(|l| l.link.link().auth_identifier.into()) + .collect(); + // convert usrpwd auth id to AuthId + #[cfg(feature = "auth_usrpwd")] + auth_ids.push(self.config.auth_id.clone().into()); + auth_ids + } /*************************************/ /* TX */ /*************************************/ diff --git a/zenoh/src/net/routing/interceptor/access_control.rs b/zenoh/src/net/routing/interceptor/access_control.rs index fe78ce8aed..885752e2c6 100644 --- a/zenoh/src/net/routing/interceptor/access_control.rs +++ b/zenoh/src/net/routing/interceptor/access_control.rs @@ -26,7 +26,10 @@ use zenoh_protocol::{ zenoh::{PushBody, RequestBody}, }; use zenoh_result::ZResult; -use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; +use zenoh_transport::{ + multicast::TransportMulticast, + unicast::{authentication::AuthId, TransportUnicast}, +}; use super::{ authorization::PolicyEnforcer, EgressInterceptor, IngressInterceptor, InterceptorFactory, @@ -37,18 +40,19 @@ pub struct AclEnforcer { enforcer: Arc, } #[derive(Clone, Debug)] -pub struct Interface { +pub struct AuthSubject { id: usize, - name: String, + name: String, //make Subject } + struct EgressAclEnforcer { policy_enforcer: Arc, - interface_list: Vec, + subject: Vec, zid: ZenohId, } struct IngressAclEnforcer { policy_enforcer: Arc, - interface_list: Vec, + subject: Vec, zid: ZenohId, } @@ -80,9 +84,29 @@ impl InterceptorFactoryTrait for AclEnforcer { &self, transport: &TransportUnicast, ) -> (Option, Option) { + let mut authn_ids = vec![]; + if let Ok(ids) = transport.get_auth_ids() { + let enforcer = self.enforcer.clone(); + for auth_id in ids { + match auth_id { + AuthId::CertCommonName(name) => { + let subject = &Subject::CertCommonName(name.clone()); + if let Some(val) = enforcer.subject_map.get(subject) { + authn_ids.push(AuthSubject { id: *val, name }); + } + } + AuthId::Username(name) => { + let subject = &Subject::Username(name.clone()); + if let Some(val) = enforcer.subject_map.get(subject) { + authn_ids.push(AuthSubject { id: *val, name }); + } + } + AuthId::None => {} + } + } + } match transport.get_zid() { Ok(zid) => { - let mut interface_list: Vec = Vec::new(); match transport.get_links() { Ok(links) => { for link in links { @@ -90,7 +114,7 @@ impl InterceptorFactoryTrait for AclEnforcer { for face in link.interfaces { let subject = &Subject::Interface(face.clone()); if let Some(val) = enforcer.subject_map.get(subject) { - interface_list.push(Interface { + authn_ids.push(AuthSubject { id: *val, name: face, }); @@ -105,13 +129,13 @@ impl InterceptorFactoryTrait for AclEnforcer { } let ingress_interceptor = Box::new(IngressAclEnforcer { policy_enforcer: self.enforcer.clone(), - interface_list: interface_list.clone(), zid, + subject: authn_ids.clone(), }); let egress_interceptor = Box::new(EgressAclEnforcer { policy_enforcer: self.enforcer.clone(), - interface_list: interface_list.clone(), zid, + subject: authn_ids, }); match ( self.enforcer.interface_enabled.ingress, @@ -282,15 +306,15 @@ impl InterceptorTrait for EgressAclEnforcer { } pub trait AclActionMethods { fn policy_enforcer(&self) -> Arc; - fn interface_list(&self) -> Vec; fn zid(&self) -> ZenohId; fn flow(&self) -> InterceptorFlow; + fn authn_ids(&self) -> Vec; fn action(&self, action: Action, log_msg: &str, key_expr: &str) -> Permission { let policy_enforcer = self.policy_enforcer(); - let interface_list = self.interface_list(); + let authn_ids: Vec = self.authn_ids(); let zid = self.zid(); let mut decision = policy_enforcer.default_permission; - for subject in &interface_list { + for subject in &authn_ids { match policy_enforcer.policy_decision_point(subject.id, self.flow(), action, key_expr) { Ok(Permission::Allow) => { tracing::trace!( @@ -336,32 +360,28 @@ impl AclActionMethods for EgressAclEnforcer { fn policy_enforcer(&self) -> Arc { self.policy_enforcer.clone() } - - fn interface_list(&self) -> Vec { - self.interface_list.clone() - } - fn zid(&self) -> ZenohId { self.zid } fn flow(&self) -> InterceptorFlow { InterceptorFlow::Egress } + fn authn_ids(&self) -> Vec { + self.subject.clone() + } } impl AclActionMethods for IngressAclEnforcer { fn policy_enforcer(&self) -> Arc { self.policy_enforcer.clone() } - - fn interface_list(&self) -> Vec { - self.interface_list.clone() - } - fn zid(&self) -> ZenohId { self.zid } fn flow(&self) -> InterceptorFlow { InterceptorFlow::Ingress } + fn authn_ids(&self) -> Vec { + self.subject.clone() + } } diff --git a/zenoh/src/net/routing/interceptor/authorization.rs b/zenoh/src/net/routing/interceptor/authorization.rs index 4ff36b1ce3..78185c9405 100644 --- a/zenoh/src/net/routing/interceptor/authorization.rs +++ b/zenoh/src/net/routing/interceptor/authorization.rs @@ -177,6 +177,20 @@ impl PolicyEnforcer { ); } } + match rule.usernames { + Some(_) => (), + None => { + tracing::warn!("ACL config usernames list is empty. Applying rule #{} to all usernames", rule_offset); + rule.usernames = Some(Vec::new()); + } + } + match rule.cert_common_names { + Some(_) => (), + None => { + tracing::warn!("ACL config cert_common_names list is empty. Applying rule #{} to all certificate common names", rule_offset); + rule.cert_common_names = Some(Vec::new()); + } + } } let policy_information = self.policy_information_point(&rules)?; let subject_map = policy_information.subject_map; @@ -229,9 +243,7 @@ impl PolicyEnforcer { for config_rule in config_rule_set { // config validation let mut validation_err = String::new(); - if config_rule.interfaces.as_ref().unwrap().is_empty() { - validation_err.push_str("ACL config interfaces list is empty. "); - } + if config_rule.actions.is_empty() { validation_err.push_str("ACL config actions list is empty. "); } @@ -244,6 +256,28 @@ impl PolicyEnforcer { if !validation_err.is_empty() { bail!("{}", validation_err); } + + //for when at least one is not empty + let mut subject_validation_err: usize = 0; + validation_err = String::new(); + + if config_rule.interfaces.as_ref().unwrap().is_empty() { + subject_validation_err += 1; + validation_err.push_str("ACL config interfaces list is empty. "); + } + if config_rule.cert_common_names.as_ref().unwrap().is_empty() { + subject_validation_err += 1; + validation_err.push_str("ACL config certificate common names list is empty. "); + } + if config_rule.usernames.as_ref().unwrap().is_empty() { + subject_validation_err += 1; + validation_err.push_str("ACL config usernames list is empty. "); + } + + if subject_validation_err == 3 { + bail!("{}", validation_err); + } + for subject in config_rule.interfaces.as_ref().unwrap() { if subject.trim().is_empty() { bail!("found an empty interface value in interfaces list"); @@ -265,6 +299,48 @@ impl PolicyEnforcer { } } } + for subject in config_rule.cert_common_names.as_ref().unwrap() { + if subject.trim().is_empty() { + bail!("found an empty value in certificate common names list"); + } + for flow in config_rule.flows.as_ref().unwrap() { + for action in &config_rule.actions { + for key_expr in &config_rule.key_exprs { + if key_expr.trim().is_empty() { + bail!("found an empty key-expression value in key_exprs list"); + } + policy_rules.push(PolicyRule { + subject: Subject::CertCommonName(subject.clone()), + key_expr: key_expr.clone(), + action: *action, + permission: config_rule.permission, + flow: *flow, + }) + } + } + } + } + for subject in config_rule.usernames.as_ref().unwrap() { + if subject.trim().is_empty() { + bail!("found an empty value in usernames list"); + } + for flow in config_rule.flows.as_ref().unwrap() { + for action in &config_rule.actions { + for key_expr in &config_rule.key_exprs { + if key_expr.trim().is_empty() { + bail!("found an empty key-expression value in key_exprs list"); + } + policy_rules.push(PolicyRule { + subject: Subject::Username(subject.clone()), + key_expr: key_expr.clone(), + action: *action, + permission: config_rule.permission, + flow: *flow, + }) + } + } + } + } } let mut subject_map = SubjectMap::default(); let mut counter = 1; @@ -293,6 +369,9 @@ impl PolicyEnforcer { key_expr: &str, ) -> ZResult { let policy_map = &self.policy_map; + if policy_map.is_empty() { + return Ok(self.default_permission); + } match policy_map.get(&subject) { Some(single_policy) => { let deny_result = single_policy diff --git a/zenoh/tests/acl.rs b/zenoh/tests/acl.rs index b78a9ac888..3aed0e6541 100644 --- a/zenoh/tests/acl.rs +++ b/zenoh/tests/acl.rs @@ -47,7 +47,7 @@ mod test { async fn get_basic_router_config() -> Config { let mut config = config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); - config.listen.endpoints = vec!["tcp/127.0.0.1:7447".parse().unwrap()]; + config.listen.endpoints = vec!["tcp/127.0.0.1:27447".parse().unwrap()]; config.scouting.multicast.set_enabled(Some(false)).unwrap(); config } @@ -59,9 +59,9 @@ mod test { async fn get_client_sessions() -> (Session, Session) { println!("Opening client sessions"); - let config = config::client(["tcp/127.0.0.1:7447".parse::().unwrap()]); + let config = config::client(["tcp/127.0.0.1:27447".parse::().unwrap()]); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let config = config::client(["tcp/127.0.0.1:7447".parse::().unwrap()]); + let config = config::client(["tcp/127.0.0.1:27447".parse::().unwrap()]); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } diff --git a/zenoh/tests/authentication.rs b/zenoh/tests/authentication.rs new file mode 100644 index 0000000000..e4b15d5771 --- /dev/null +++ b/zenoh/tests/authentication.rs @@ -0,0 +1,1245 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +mod test { + use std::{ + fs, + path::Path, + sync::{Arc, Mutex}, + time::Duration, + }; + + use tokio::runtime::Handle; + use zenoh::{ + config, + config::{EndPoint, WhatAmI}, + prelude::*, + Config, Session, + }; + use zenoh_core::{zlock, ztimeout}; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const KEY_EXPR: &str = "test/demo"; + const VALUE: &str = "zenoh"; + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_authentication() { + zenoh_util::try_init_log_from_env(); + let path = "./tests/testfiles"; + create_new_files(path).await.unwrap(); + println!("testfiles created successfully."); + + test_pub_sub_deny_then_allow_usrpswd().await; + test_pub_sub_allow_then_deny_usrpswd().await; + test_get_qbl_allow_then_deny_usrpswd().await; + test_get_qbl_deny_then_allow_usrpswd().await; + + test_pub_sub_deny_then_allow_tls(3774).await; + test_pub_sub_allow_then_deny_tls(3775).await; + test_get_qbl_allow_then_deny_tls(3776).await; + test_get_qbl_deny_then_allow_tls(3777).await; + + test_pub_sub_deny_then_allow_quic(3774).await; + test_pub_sub_allow_then_deny_quic(3775).await; + test_get_qbl_deny_then_allow_quic(3776).await; + test_get_qbl_allow_then_deny_quic(3777).await; + + std::fs::remove_dir_all(path).unwrap(); + println!("testfiles removed successfully."); + } + + #[allow(clippy::all)] + async fn create_new_files(file_path: &str) -> std::io::Result<()> { + use std::io::prelude::*; + let ca_pem = b"-----BEGIN CERTIFICATE----- +MIIDiTCCAnGgAwIBAgIUO1x6LAlICgKs5+pYUTo4CughfKEwDQYJKoZIhvcNAQEL +BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G +A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz +MTExNDM0MjNaFw0yNTAzMTExNDM0MjNaMFQxCzAJBgNVBAYTAkZSMQswCQYDVQQI +DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRgwFgYDVQQDDA96 +c190ZXN0X3Jvb3RfY2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC3 +pFWM+IJNsRCYHt1v/TliecppwVZV+ZHfFw9JKN9ev4K/fWHUiAOwp91MOLxbaYKd +C6dxW28YVGltoGz3kUZJZcJRQVso1jXv24Op4muOsiYXukLc4TU2F6dG1XqkLt5t +svsYAQFf1uK3//QZFVRBosJEn+jjiJ4XCvt49mnPRolp1pNKX0z31mZO6bSly6c9 +OVlJMjWpDCYSOuf6qZZ36fa9eSut2bRJIPY0QCsgnqYBTnIEhksS+3jy6Qt+QpLz +95pFdLbW/MW4XKpaDltyYkO6QrBekF6uWRlvyAHU+NqvXZ4F/3Z5l26qLuBcsLPJ +kyawkO+yNIDxORmQgMczAgMBAAGjUzBRMB0GA1UdDgQWBBThgotd9ws2ryEEaKp2 ++RMOWV8D7jAfBgNVHSMEGDAWgBThgotd9ws2ryEEaKp2+RMOWV8D7jAPBgNVHRMB +Af8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQA9QoPv78hGmvmqF4GZeqrOBKQB +N/H5wL7f8H6BXU/wpNo2nnWOJn3u37lT+zivAdGEv+x+GeKekcugKBCSluhBLpVb +VNXe4WwMm5FBuO2NRBN2nblTMm1kEO00nVk1/yNo4hI8mj7d4YLU62d7324osNpF +wHqu6B0/c99JeKRvODGswyff1i8rJ1jpcgk/JmHg7UQBHEIkn0cRR0f9W3Mxv6b5 +ZeowRe81neWNkC6IMiMmzA0iHGkhoUMA15qG1ZKOr1XR364LH5BfNNpzAWYwkvJs +0JFrrdw+rm+cRJWs55yiyCCs7pyg1IJkY/o8bifdCOUgIyonzffwREk3+kZR +-----END CERTIFICATE-----"; + + let client_side_pem = b"-----BEGIN CERTIFICATE----- +MIIDjDCCAnSgAwIBAgIUOi9jKILrOzfRNGIkQ48S90NehpkwDQYJKoZIhvcNAQEL +BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G +A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz +MTkxMTMxNDhaFw0yNTAzMTkxMTMxNDhaMFAxCzAJBgNVBAYTAkZSMQswCQYDVQQI +DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRQwEgYDVQQDDAtj +bGllbnRfc2lkZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMzU2p1a +ly/1bi2TDZ8+Qlvk9/3KyHqrg2BGZUxB3Pj/lufDuYNwOHkss99wp8gzMsT28mD4 +y6X7nCgEN8WeHl+/xfLuGsWIBa1OOr6dz0qewoWFsor01cQ8+nwAKlgnz6IvHfkQ +OJZD/QYSdyn6c1AcIyS60vo4qMjyI4OVb1Dl4WpC4vCmWvDT0WjBZ5GckCnuQ8wS +wZ5MtPuMQf8kYX95ll7eBtDfEXF9Oja0l1/5SmlHuKyqDy4sIKovxtFHTqgb8PUc +yT33pUHOsBXruNBxl1MKq1outdMqcQknT6FAC+aVZ7bTlwhnH8p5Apn57g+dJYTI +9dCr1e2oK5NohhkCAwEAAaNaMFgwFgYDVR0RBA8wDYILY2xpZW50X3NpZGUwHQYD +VR0OBBYEFHDUYYfQacLj1tp49OG9NbPuL0N/MB8GA1UdIwQYMBaAFOGCi133Czav +IQRoqnb5Ew5ZXwPuMA0GCSqGSIb3DQEBCwUAA4IBAQB+nFAe6QyD2AaFdgrFOyEE +MeYb97sy9p5ylhMYyU62AYsIzzpTY74wBG78qYPIw3lAYzNcN0L6T6kBQ4lu6gFm +XB0SqCZ2AkwvV8tTlbLkZeoO6rONeke6c8cJsxYN7NiknDvTMrkTTgiyvbCWfEVX +Htnc4j/KzSBX3UjVcbPM3L/6KwMRw050/6RCiOIPFjTOCfTGoDx5fIyBk3ch/Plw +TkH2juHxX0/aCxr8hRE1v9+pXXlGnGoKbsDMLN9Aziu6xzdT/kD7BvyoM8rh7CE5 +ae7/R4sd13cZ2WGDPimqO0z1kItMOIdiYvk4DgOg+J8hZSkKT56erafdDa2LPBE6 +-----END CERTIFICATE-----"; + + let client_side_key = b"-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDM1NqdWpcv9W4t +kw2fPkJb5Pf9ysh6q4NgRmVMQdz4/5bnw7mDcDh5LLPfcKfIMzLE9vJg+Mul+5wo +BDfFnh5fv8Xy7hrFiAWtTjq+nc9KnsKFhbKK9NXEPPp8ACpYJ8+iLx35EDiWQ/0G +Encp+nNQHCMkutL6OKjI8iODlW9Q5eFqQuLwplrw09FowWeRnJAp7kPMEsGeTLT7 +jEH/JGF/eZZe3gbQ3xFxfTo2tJdf+UppR7isqg8uLCCqL8bRR06oG/D1HMk996VB +zrAV67jQcZdTCqtaLrXTKnEJJ0+hQAvmlWe205cIZx/KeQKZ+e4PnSWEyPXQq9Xt +qCuTaIYZAgMBAAECggEAAlqVVw7UEzLjtN4eX1S6tD3jvCzFBETdjgENF7TfjlR4 +lln9UyV6Xqkc+Y28vdwZwqHwW90sEPCc5ShUQD7+jBzi8FVcZSX4o7rVCbz8RXgg +1eI5EKf632YQflWNpwTxGcTnGCY/sjleil/yst6sDdD+9eR4OXQme2Wt8wyH8pLm +bf1OensGrFu3kJaPMOfP6jXnqEqkUPqmaCNW7+Ans8E+4J9oksRVPQJEuxwSjdJu +BlG50KKpl0XwZ/u/hkkj8/BlRDa62YMGJkFOwaaGUu2/0UU139XaJiMSPoL6t/BU +1H15dtW9liEtnHIssXMRzc9cg+xPgCs79ABXSZaFUQKBgQD4mH/DcEFwkZQcr08i +GUk0RE5arAqHui4eiujcPZVV6j/L7PHHmabKRPBlsndFP7KUCtvzNRmHq7JWDkpF +S36OE4e94CBYb0CIrO8OO5zl1vGAn5qa9ckefSFz9AMWW+hSuo185hFjt67BMaI0 +8CxfYDH+QY5D4JE5RhSwsOmiUQKBgQDS7qjq+MQKPHHTztyHK8IbAfEGlrBdCAjf +K1bDX2BdfbRJMZ+y8LgK5HxDPlNx2/VauBLsIyU1Zirepd8hOsbCVoK1fOq+T7bY +KdB1oqLK1Rq1sMBc26F24LBaZ3Pw5XgYEcvaOW0JFQ9Oc4VjcIXKjTNhobNOegfK +QDnw8fEtSQKBgQDrCuTh2GVHFZ3AcVCUoOvB60NaH4flRHcOkbARbHihvtWK7gC8 +A97bJ8tTnCWA5/TkXFAR54a36/K1wtUeJ38Evhp9wEdU1ftiPn/YKSzzcwLr5fu7 +v9/kX9MdWv0ASu2iKphUGwMeETG9oDwJaXvKwZ0DFOB59P3Z9RTi6qI7wQKBgQCp +uBZ6WgeDJPeBsaSHrpHUIU/KOV1WvaxFxR1evlNPZmG1sxQIat/rA8VoZbHGn3Ff +uVSgY/cAbGB6HYTXu+9JV0p8tTI8Ru+cJqjwvhe2lJmVL87X6HCWsluzoiIL5tcm +pssbn7E36ZYTTag6RsOgItUA7ZbUwiOafOsiD8o64QKBgE6nOkAfy5mbp7X+q9uD +J5y6IXpY/Oia/RwveLWFbI/aum4Nnhb6L9Y0XlrYjm4cJOchQyDR7FF6f4EuAiYb +wdxBbkxXpwXnfKCtNvMF/wZMvPVaS5HTQga8hXMrtlW6jtTJ4HmkTTB/MILAXVkJ +EHi+N70PcrYg6li415TGfgDz +-----END PRIVATE KEY-----"; + + let server_side_pem = b"-----BEGIN CERTIFICATE----- +MIIDjDCCAnSgAwIBAgIUOi9jKILrOzfRNGIkQ48S90NehpgwDQYJKoZIhvcNAQEL +BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G +A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz +MTkxMTMxMDRaFw0yNTAzMTkxMTMxMDRaMFAxCzAJBgNVBAYTAkZSMQswCQYDVQQI +DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRQwEgYDVQQDDAtz +ZXJ2ZXJfc2lkZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKw4eKzt +T1inzuEIPBaPksWyjoD9n6uJx9jAQ2wRB6rXiAsXVLRSuczdGDpb1MwAqoIi6ozw +tzDRwkr58vUNaTCswxadlAmB44JEVYKZoublHjlVj5ygr0R4R5F2T9tIV+jpqZuK +HR4dHe8PiDCiWVzWvYwOLVKXQKSeaE2Z143ukVIJ85qmNykJ066AVhgWnIYSCR9c +s7WPBdTWAW3L4yNlast9hfvxdQNDs5AtUnJKfAX+7DylPAm8V7YjU1k9AtTNPbpy +kb9X97ErsB8891MmZaGZp0J6tnuucDkk0dlowMVvi2aUCsYoKF5DgGxtyVAeLhTP +70GenaLe2uwG8fMCAwEAAaNaMFgwFgYDVR0RBA8wDYILc2VydmVyX3NpZGUwHQYD +VR0OBBYEFBKms1sOw8nM/O5SN1EZIH+LsWaPMB8GA1UdIwQYMBaAFOGCi133Czav +IQRoqnb5Ew5ZXwPuMA0GCSqGSIb3DQEBCwUAA4IBAQA6H/sfm8YUn86+GwxNR9i9 +MCL7WHVRx3gS9ENK87+HtZNL2TVvhPJtupG3Pjgqi33FOHrM4rMUcWSZeCEycVgy +5cjimQLwfDljIBRQE6sem3gKf0obdWl5AlPDLTL/iKj5Su7NycrjZFYqkjZjn+58 +fe8lzHNeP/3RQTgjJ98lQI0bdzGDG1+QoxTgPEc77vgN0P4MHJYx2auz/7jYBqNJ +ko8nugIQsd4kOhmOIBUQ8aXkXFktSQIerEGB8uw5iF2cCdH/sTCvhzhxLb4IWo/O +0cAZ+Vs4FW3KUn/Y44yrVAWl1H6xdFsNXBqbzVEMzlt/RV3rH70RDCc20XhP+w+g +-----END CERTIFICATE-----"; + + let server_side_key = b"-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCsOHis7U9Yp87h +CDwWj5LFso6A/Z+ricfYwENsEQeq14gLF1S0UrnM3Rg6W9TMAKqCIuqM8Lcw0cJK ++fL1DWkwrMMWnZQJgeOCRFWCmaLm5R45VY+coK9EeEeRdk/bSFfo6ambih0eHR3v +D4gwollc1r2MDi1Sl0CknmhNmdeN7pFSCfOapjcpCdOugFYYFpyGEgkfXLO1jwXU +1gFty+MjZWrLfYX78XUDQ7OQLVJySnwF/uw8pTwJvFe2I1NZPQLUzT26cpG/V/ex +K7AfPPdTJmWhmadCerZ7rnA5JNHZaMDFb4tmlArGKCheQ4BsbclQHi4Uz+9Bnp2i +3trsBvHzAgMBAAECggEAUjpIS/CmkOLWYRVoczEr197QMYBnCyUm2TO7PU7IRWbR +GtKR6+MPuWPbHIoaCSlMQARhztdj8BhG1zuOKDi1/7qNDzA/rWZp9RmhZlDquamt +i5xxjEwgQuXW7fn6WO2qo5dlFtGT43vtfeYBlY7+cdhJ+iQOub9j6vWDQYHxrF7x +yM8xvNzomHThvLFzWXJV/nGjX5pqPraMmwJUW+MGX0YaEr6tClqsc1Kmxhs3iIUo +1JCqh3FpVu2i/mR9fdcQ0ONT/s1UHzy+1Bhmh3j2Fuk4+ZeLMfxTfFxk5U0BeMQY +sES3qmd+pG5iqPW+AmXy299G89jf5+1Q4J2Km5KOUQKBgQDidifoeknpi9hRHLLD +w/7KMMe8yYg3c3dv5p0iUQQ2pXd1lJIFQ+B2/D+hfOXhnN/iCDap89ll2LoQ2Q9L +38kQXH06HCM2q11RP0BEsZCG0CnluS+JVNnjs/ALi+yc4HSpzKPs3zXIC3dLOUbq +ov5Esa5h/RU6+NO+DH72TWTv6wKBgQDCryPKtOcLp1eqdwIBRoXdPZeUdZdnwT8+ +70DnC+YdOjFkqTbaoYE5ePa3ziGOZyTFhJbPgiwEdj9Ez1JSgqLLv5hBc4s6FigK +D7fOnn7Q7+al/kEW7+X5yoSl1bFuPCqGL1xxzxmpDY8Gf3nyZ+QGfWIenbk3nq12 +nTgINyWMGQKBgQDSrxBDxXl8EMGH/MYHQRGKs8UvSuMyi3bjoU4w/eSInno75qPO +yC5NJDJin9sSgar8E54fkSCBExdP01DayvC5CwLqDAFqvBTOIKU/A18tPP6tnRKv +lkQ8Bkxdwai47k07J4qeNa9IU/qA/mGOq2MZL6DHwvd8bMA5gFCh/rDYTwKBgAPm +gGASScK5Ao+evMKLyCjLkBrgVD026O542qMGYQDa5pxuq3Or4qvlGYRLM+7ncBwo +8OCNahZYzCGzyaFvjpVobEN7biGmyfyRngwcrsu+0q8mreUov0HG5etwoZJk0DFK +B58cGBaD+AaYTTgnDrF2l52naUuM+Uq0EahQeocZAoGBAMJEGUFyEdm1JATkNhBv +ruDzj07PCjdvq3lUJix2ZlKlabsi5V+oYxMmrUSU8Nkaxy6O+qETNRNWQeWbPQHL +IZx/qrP32PmWX0IVj3pbfKHQSpOKNGzL9xUJ/FIycZWyT3yGf24KBuJwIx7xSrRx +qNsoty1gY/y3n7SN/iMZo8lO +-----END PRIVATE KEY-----"; + + let credentials_txt = b"client1name:client1passwd +client2name:client2passwd"; + + let certs_dir = Path::new(file_path); + if !certs_dir.exists() { + fs::create_dir(certs_dir)?; + } + struct Testfile<'a> { + name: &'a str, + value: &'a [u8], + } + + let test_files = vec![ + Testfile { + name: "ca.pem", + value: ca_pem, + }, + Testfile { + name: "clientsidekey.pem", + value: client_side_key, + }, + Testfile { + name: "clientside.pem", + value: client_side_pem, + }, + Testfile { + name: "serversidekey.pem", + value: server_side_key, + }, + Testfile { + name: "serverside.pem", + value: server_side_pem, + }, + Testfile { + name: "credentials.txt", + value: credentials_txt, + }, + ]; + for test_file in test_files.iter() { + let file_path = certs_dir.join(test_file.name); + let mut file = fs::File::create(&file_path)?; + file.write_all(test_file.value)?; + } + + Ok(()) + } + + async fn get_basic_router_config_tls(port: u16) -> Config { + let mut config = config::default(); + config.set_mode(Some(WhatAmI::Router)).unwrap(); + config.listen.endpoints = vec![format!("tls/127.0.0.1:{}", port).parse().unwrap()]; + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "tls" + ], + "tls": { + "server_private_key": "tests/testfiles/serversidekey.pem", + "server_certificate": "tests/testfiles/serverside.pem", + "root_ca_certificate": "tests/testfiles/ca.pem", + "client_auth": true, + "server_name_verification": false + }, + }, + }"#, + ) + .unwrap(); + config + } + async fn get_basic_router_config_quic(port: u16) -> Config { + let mut config = config::default(); + config.set_mode(Some(WhatAmI::Router)).unwrap(); + config.listen.endpoints = vec![format!("quic/127.0.0.1:{}", port).parse().unwrap()]; + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "quic" + ], + "tls": { + "server_private_key": "tests/testfiles/serversidekey.pem", + "server_certificate": "tests/testfiles/serverside.pem", + "root_ca_certificate": "tests/testfiles/ca.pem", + "client_auth": true, + "server_name_verification": false + }, + }, + }"#, + ) + .unwrap(); + config + } + + async fn get_basic_router_config_usrpswd() -> Config { + let mut config = config::default(); + config.set_mode(Some(WhatAmI::Router)).unwrap(); + config.listen.endpoints = vec!["tcp/127.0.0.1:37447".parse().unwrap()]; + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + config + .insert_json5( + "transport", + r#"{ + "auth": { + usrpwd: { + user: "routername", + password: "routerpasswd", + dictionary_file: "tests/testfiles/credentials.txt", + }, + }, + }"#, + ) + .unwrap(); + config + } + async fn close_router_session(s: Session) { + println!("Closing router session"); + ztimeout!(s.close()).unwrap(); + } + + async fn get_client_sessions_tls(port: u16) -> (Session, Session) { + println!("Opening client sessions"); + let mut config = config::client([format!("tls/127.0.0.1:{}", port) + .parse::() + .unwrap()]); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "tls" + ], + "tls": { + "root_ca_certificate": "tests/testfiles/ca.pem", + "client_private_key": "tests/testfiles/clientsidekey.pem", + "client_certificate": "tests/testfiles/clientside.pem", + "client_auth": true, + "server_name_verification": false + } + } + }"#, + ) + .unwrap(); + let s01 = ztimeout!(zenoh::open(config)).unwrap(); + let mut config = config::client([format!("tls/127.0.0.1:{}", port) + .parse::() + .unwrap()]); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "tls" + ], + "tls": { + "root_ca_certificate": "tests/testfiles/ca.pem", + "client_private_key": "tests/testfiles/clientsidekey.pem", + "client_certificate": "tests/testfiles/clientside.pem", + "client_auth": true, + "server_name_verification": false + } + } + }"#, + ) + .unwrap(); + let s02 = ztimeout!(zenoh::open(config)).unwrap(); + (s01, s02) + } + + async fn get_client_sessions_quic(port: u16) -> (Session, Session) { + println!("Opening client sessions"); + let mut config = config::client([format!("quic/127.0.0.1:{}", port) + .parse::() + .unwrap()]); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "quic" + ], + "tls": { + "root_ca_certificate": "tests/testfiles/ca.pem", + "client_private_key": "tests/testfiles/clientsidekey.pem", + "client_certificate": "tests/testfiles/clientside.pem", + "client_auth": true, + "server_name_verification": false + } + } + }"#, + ) + .unwrap(); + let s01 = ztimeout!(zenoh::open(config)).unwrap(); + let mut config = config::client([format!("quic/127.0.0.1:{}", port) + .parse::() + .unwrap()]); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "quic" + ], + "tls": { + "root_ca_certificate": "tests/testfiles/ca.pem", + "client_private_key": "tests/testfiles/clientsidekey.pem", + "client_certificate": "tests/testfiles/clientside.pem", + "client_auth": true, + "server_name_verification": false + } + } + }"#, + ) + .unwrap(); + let s02 = ztimeout!(zenoh::open(config)).unwrap(); + (s01, s02) + } + + async fn get_client_sessions_usrpswd() -> (Session, Session) { + println!("Opening client sessions"); + let mut config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + config + .insert_json5( + "transport", + r#"{ + "auth": { + usrpwd: { + user: "client1name", + password: "client1passwd", + }, + } + }"#, + ) + .unwrap(); + let s01 = ztimeout!(zenoh::open(config)).unwrap(); + let mut config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + config + .insert_json5( + "transport", + r#"{ + "auth": { + usrpwd: { + user: "client2name", + password: "client2passwd", + }, + } + }"#, + ) + .unwrap(); + let s02 = ztimeout!(zenoh::open(config)).unwrap(); + (s01, s02) + } + + async fn close_sessions(s01: Session, s02: Session) { + println!("Closing client sessions"); + ztimeout!(s01.close()).unwrap(); + ztimeout!(s02.close()).unwrap(); + } + + async fn test_pub_sub_deny_then_allow_tls(port: u16) { + println!("test_pub_sub_deny_then_allow_tls"); + + let mut config_router = get_basic_router_config_tls(port).await; + + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": false, + "default_permission": "deny", + "rules": [ + { + "permission": "allow", + "flows": ["ingress","egress"], + "actions": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + "cert_common_names": [ + "client_side" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (sub_session, pub_session) = get_client_sessions_tls(port).await; + { + let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + }) + .await + .unwrap(); + + tokio::time::sleep(SLEEP).await; + publisher.put(VALUE).await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_eq!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; + } + + async fn test_pub_sub_allow_then_deny_tls(port: u16) { + println!("test_pub_sub_allow_then_deny_tls"); + let mut config_router = get_basic_router_config_tls(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "permission": "deny", + "flows": ["egress"], + "actions": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + "cert_common_names": [ + "client_side" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (sub_session, pub_session) = get_client_sessions_tls(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = + ztimeout!(sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + + ztimeout!(publisher.put(VALUE)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert_ne!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; + } + + async fn test_get_qbl_deny_then_allow_tls(port: u16) { + println!("test_get_qbl_deny_then_allow_tls"); + + let mut config_router = get_basic_router_config_tls(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + "permission": "allow", + "flows": ["egress","ingress"], + "actions": [ + "get", + "declare_queryable" + ], + "key_exprs": [ + "test/demo" + ], + "cert_common_names": [ + "client_side" + ] + }, + ] + }"#, + ) + .unwrap(); + + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions_tls(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() + }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().deserialize::().unwrap(); + break; + } + Err(e) => println!( + "Error : {}", + e.payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)) + ), + } + } + tokio::time::sleep(SLEEP).await; + assert_eq!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); + } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; + } + + async fn test_get_qbl_allow_then_deny_tls(port: u16) { + println!("test_get_qbl_allow_then_deny_tls"); + + let mut config_router = get_basic_router_config_tls(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "permission": "deny", + "flows": ["egress"], + "actions": [ + "get", + "declare_queryable" + ], + "key_exprs": [ + "test/demo" + ], + "cert_common_names": [ + "client_side" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions_tls(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() + }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().deserialize::().unwrap(); + break; + } + Err(e) => println!( + "Error : {}", + e.payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)) + ), + } + } + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); + } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; + } + + async fn test_pub_sub_deny_then_allow_quic(port: u16) { + println!("test_pub_sub_deny_then_allow_quic"); + + let mut config_router = get_basic_router_config_quic(port).await; + + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": false, + "default_permission": "deny", + "rules": [ + { + "permission": "allow", + "flows": ["ingress","egress"], + "actions": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + "cert_common_names": [ + "client_side" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (sub_session, pub_session) = get_client_sessions_quic(port).await; + { + let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + }) + .await + .unwrap(); + + tokio::time::sleep(SLEEP).await; + publisher.put(VALUE).await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_eq!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; + } + + #[allow(unused)] + async fn test_pub_sub_allow_then_deny_quic(port: u16) { + println!("test_pub_sub_allow_then_deny_quic"); + + let mut config_router = get_basic_router_config_quic(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "permission": "deny", + "flows": ["egress"], + "actions": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + "cert_common_names": [ + "client_side" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (sub_session, pub_session) = get_client_sessions_quic(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = + ztimeout!(sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + + ztimeout!(publisher.put(VALUE)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert_ne!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; + } + + #[allow(unused)] + async fn test_get_qbl_deny_then_allow_quic(port: u16) { + println!("test_get_qbl_deny_then_allow_quic"); + + let mut config_router = get_basic_router_config_quic(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + "permission": "allow", + "flows": ["egress","ingress"], + "actions": [ + "get", + "declare_queryable"], + "key_exprs": [ + "test/demo" + ], + "cert_common_names": [ + "client_side" + ] + }, + ] + }"#, + ) + .unwrap(); + + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions_quic(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() + }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().deserialize::().unwrap(); + break; + } + Err(e) => println!( + "Error : {}", + e.payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)) + ), + } + } + tokio::time::sleep(SLEEP).await; + assert_eq!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); + } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; + } + + #[allow(unused)] + async fn test_get_qbl_allow_then_deny_quic(port: u16) { + println!("test_get_qbl_allow_then_deny_quic"); + + let mut config_router = get_basic_router_config_quic(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": + [ + { + "permission": "deny", + "flows": ["egress"], + "actions": [ + "get", + "declare_queryable" + ], + "key_exprs": [ + "test/demo" + ], + "cert_common_names": [ + "client_side" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions_quic(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() + }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().deserialize::().unwrap(); + break; + } + Err(e) => println!( + "Error : {}", + e.payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)) + ), + } + } + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); + } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; + } + + async fn test_pub_sub_deny_then_allow_usrpswd() { + println!("test_pub_sub_deny_then_allow_usrpswd"); + + let mut config_router = get_basic_router_config_usrpswd().await; + + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": false, + "default_permission": "deny", + "rules": [ + { + "permission": "allow", + "flows": ["ingress","egress"], + "actions": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + "usernames": [ + "client1name", + "client2name" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (sub_session, pub_session) = get_client_sessions_usrpswd().await; + { + let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + }) + .await + .unwrap(); + + tokio::time::sleep(SLEEP).await; + publisher.put(VALUE).await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert_eq!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; + } + + async fn test_pub_sub_allow_then_deny_usrpswd() { + println!("test_pub_sub_allow_then_deny_usrpswd"); + + let mut config_router = get_basic_router_config_usrpswd().await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "permission": "deny", + "flows": ["egress"], + "actions": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + "usernames": [ + "client1name", + "client2name" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (sub_session, pub_session) = get_client_sessions_usrpswd().await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = + ztimeout!(sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + + ztimeout!(publisher.put(VALUE)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert_ne!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_sessions(sub_session, pub_session).await; + close_router_session(session).await; + } + + async fn test_get_qbl_deny_then_allow_usrpswd() { + println!("test_get_qbl_deny_then_allow_usrpswd"); + + let mut config_router = get_basic_router_config_usrpswd().await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + "permission": "allow", + "flows": ["egress","ingress"], + "actions": [ + "get", + "declare_queryable" + ], + "key_exprs": [ + "test/demo" + ], + "usernames": [ + "client1name", + "client2name" + ] + }, + ] + }"#, + ) + .unwrap(); + + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions_usrpswd().await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() + }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().deserialize::().unwrap(); + break; + } + Err(e) => println!( + "Error : {}", + e.payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)) + ), + } + } + tokio::time::sleep(SLEEP).await; + assert_eq!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); + } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; + } + + async fn test_get_qbl_allow_then_deny_usrpswd() { + println!("test_get_qbl_allow_then_deny_usrpswd"); + + let mut config_router = get_basic_router_config_usrpswd().await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "permission": "deny", + "flows": ["egress"], + "actions": [ + "get", + "declare_queryable" + ], + "key_exprs": [ + "test/demo" + ], + "usernames": [ + "client1name", + "client2name" + ] + }, + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions_usrpswd().await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() + }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().deserialize::().unwrap(); + break; + } + Err(e) => println!( + "Error : {}", + e.payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)) + ), + } + } + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); + } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; + } +}