diff --git a/Cargo.lock b/Cargo.lock index be46441b2e..7ff6cbd6ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -251,7 +251,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", - "event-listener", + "event-listener 2.5.3", "futures-core", ] @@ -337,7 +337,7 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ - "event-listener", + "event-listener 2.5.3", ] [[package]] @@ -351,7 +351,7 @@ dependencies = [ "autocfg", "blocking", "cfg-if 1.0.0", - "event-listener", + "event-listener 2.5.3", "futures-lite", "rustix 0.37.25", "signal-hook", @@ -1018,13 +1018,13 @@ checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" [[package]] name = "derive-new" -version = "0.5.9" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3418329ca0ad70234b9735dc4ceed10af4df60eff9c8e7b06cb5e520d92c3535" +checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.33", ] [[package]] @@ -1175,6 +1175,17 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "770d968249b5d99410d61f5bf89057f3199a077a04d087092f58e7d10692baae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite 0.2.13", +] + [[package]] name = "fancy-regex" version = "0.11.0" @@ -2268,9 +2279,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "ordered-float" -version = "3.9.1" +version = "4.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a54938017eacd63036332b4ae5c8a49fc8c0c1d6d629893057e4f13609edd06" +checksum = "536900a8093134cf9ccf00a27deb3532421099e958d9dd431135d0c7543ca1e8" dependencies = [ "num-traits", ] @@ -2647,10 +2658,10 @@ checksum = "e13f81c9a9d574310b8351f8666f5a93ac3b0069c45c28ad52c10291389a7cf9" dependencies = [ "bytes", "rand 0.8.5", - "ring", + "ring 0.16.20", "rustc-hash", "rustls", - "rustls-native-certs", + "rustls-native-certs 0.6.3", "slab", "thiserror", "tinyvec", @@ -2779,7 +2790,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4954fbc00dcd4d8282c987710e50ba513d351400dbdd00e803a05172a90d8976" dependencies = [ "pem", - "ring", + "ring 0.16.20", "time 0.3.28", "yasna", ] @@ -2886,11 +2897,25 @@ dependencies = [ "libc", "once_cell", "spin 0.5.2", - "untrusted", + "untrusted 0.7.1", "web-sys", "winapi", ] +[[package]] +name = "ring" +version = "0.17.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866" +dependencies = [ + "cc", + "getrandom 0.2.10", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys 0.48.0", +] + [[package]] name = "ringbuffer-spsc" version = "0.1.9" @@ -2993,8 +3018,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", - "ring", - "rustls-webpki", + "ring 0.16.20", + "rustls-webpki 0.101.5", "sct", ] @@ -3005,7 +3030,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.3", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.0.0", + "rustls-pki-types", "schannel", "security-framework", ] @@ -3019,14 +3057,41 @@ dependencies = [ "base64 0.21.4", ] +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64 0.21.4", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb0a1f9b9efec70d32e6d6aa3e58ebd88c3754ec98dfe9145c63cf54cc829b83" + [[package]] name = "rustls-webpki" version = "0.101.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", +] + +[[package]] +name = "rustls-webpki" +version = "0.102.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de2635c8bc2b88d367767c5de8ea1d8db9af3f6219eba28442242d9ab81d1b89" +dependencies = [ + "ring 0.17.6", + "rustls-pki-types", + "untrusted 0.9.0", ] [[package]] @@ -3089,8 +3154,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -3368,7 +3433,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38aabbeafa6f6dead8cebf246fe9fae1f9215c8d29b3a69f93bd62a9e4a3dcd6" dependencies = [ - "event-listener", + "event-listener 2.5.3", ] [[package]] @@ -3986,6 +4051,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "unzip-n" version = "0.1.2" @@ -4225,9 +4296,12 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.2" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" +checksum = "0de2cfda980f21be5a7ed2eadb3e6fe074d56022bea2cdeb1a62eb220fc04188" +dependencies = [ + "rustls-pki-types", +] [[package]] name = "win-sys" @@ -4495,7 +4569,7 @@ dependencies = [ "base64 0.21.4", "const_format", "env_logger", - "event-listener", + "event-listener 4.0.0", "flume", "form_urlencoded", "futures", @@ -4702,9 +4776,9 @@ dependencies = [ "log", "quinn", "rustls", - "rustls-native-certs", - "rustls-pemfile", - "rustls-webpki", + "rustls-native-certs 0.7.0", + "rustls-pemfile 2.0.0", + "rustls-webpki 0.102.0", "secrecy", "zenoh-config", "zenoh-core", @@ -4762,8 +4836,8 @@ dependencies = [ "futures", "log", "rustls", - "rustls-pemfile", - "rustls-webpki", + "rustls-pemfile 2.0.0", + "rustls-webpki 0.102.0", "secrecy", "webpki-roots", "zenoh-config", @@ -4991,7 +5065,7 @@ name = "zenoh-sync" version = "0.11.0-dev" dependencies = [ "async-std", - "event-listener", + "event-listener 4.0.0", "flume", "futures", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 4cba5d4dd2..7216d3075f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,9 +84,9 @@ const_format = "0.2.30" crc = "3.0.1" criterion = "0.5" derive_more = "0.99.17" -derive-new = "0.5.9" +derive-new = "0.6.0" env_logger = "0.10.0" -event-listener = "2.5.3" +event-listener = "4.0.0" flume = "0.11" form_urlencoded = "1.1.0" futures = "0.3.25" @@ -108,7 +108,7 @@ log = "0.4.17" lz4_flex = "0.11" nix = { version = "0.27", features = ["fs"] } num_cpus = "1.15.0" -ordered-float = "3.4.0" +ordered-float = "4.1.1" panic-message = "0.3.0" paste = "1.0.12" petgraph = "0.6.3" @@ -125,8 +125,9 @@ ringbuffer-spsc = "0.1.9" rsa = "0.9" rustc_version = "0.4.0" rustls = { version = "0.21.5", features = ["dangerous_configuration"] } -rustls-native-certs = "0.6.2" -rustls-pemfile = "1.0.2" +rustls-native-certs = "0.7.0" +rustls-pemfile = "2.0.0" +rustls-webpki = "0.102.0" schemars = "0.8.12" secrecy = { version = "0.8.0", features = ["serde", "alloc"] } serde = { version = "1.0.154", default-features = false, features = [ @@ -154,8 +155,7 @@ uuid = { version = "1.3.0", default-features = false, features = [ ] } # Default features are disabled due to usage in no_std crates validated_struct = "2.1.0" vec_map = "0.8.2" -rustls-webpki = "0.101.4" -webpki-roots = "0.25" +webpki-roots = "0.26.0" winapi = { version = "0.3.9", features = ["iphlpapi"] } z-serial = "0.2.1" zenoh-ext = { version = "0.11.0-dev", path = "zenoh-ext" } diff --git a/commons/zenoh-sync/src/condition.rs b/commons/zenoh-sync/src/condition.rs index 7606936c67..bae030abbb 100644 --- a/commons/zenoh-sync/src/condition.rs +++ b/commons/zenoh-sync/src/condition.rs @@ -13,9 +13,9 @@ // use async_std::sync::MutexGuard as AysncMutexGuard; use event_listener::{Event, EventListener}; -use std::sync::MutexGuard; +use std::{pin::Pin, sync::MutexGuard}; -pub type ConditionWaiter = EventListener; +pub type ConditionWaiter = Pin>; /// This is a Condition Variable similar to that provided by POSIX. /// As for POSIX condition variables, this assumes that a mutex is /// properly used to coordinate behaviour. In other terms there should diff --git a/examples/examples/z_pub.rs b/examples/examples/z_pub.rs index 54563df1d8..aebca309ad 100644 --- a/examples/examples/z_pub.rs +++ b/examples/examples/z_pub.rs @@ -44,7 +44,7 @@ struct Args { #[arg(short, long, default_value = "demo/example/zenoh-rs-pub")] /// The key expression to write to. key: KeyExpr<'static>, - #[arg(short, long, default_value = "Put from Rust")] + #[arg(short, long, default_value = "Pub from Rust!")] /// The value to write. value: String, #[command(flatten)] diff --git a/examples/examples/z_put.rs b/examples/examples/z_put.rs index 9b625be552..a38f0c7f01 100644 --- a/examples/examples/z_put.rs +++ b/examples/examples/z_put.rs @@ -35,7 +35,7 @@ struct Args { #[arg(short, long, default_value = "demo/example/zenoh-rs-put")] /// The key expression to write to. key: KeyExpr<'static>, - #[arg(short, long, default_value = "Put from Rust")] + #[arg(short, long, default_value = "Put from Rust!")] /// The value to write. value: String, #[command(flatten)] diff --git a/examples/examples/z_queryable.rs b/examples/examples/z_queryable.rs index 5738c67f6c..54b9858cf0 100644 --- a/examples/examples/z_queryable.rs +++ b/examples/examples/z_queryable.rs @@ -89,7 +89,7 @@ struct Args { #[arg(short, long, default_value = "demo/example/zenoh-rs-queryable")] /// The key expression matching queries to reply to. key: KeyExpr<'static>, - #[arg(short, long, default_value = "Queryable from Rust")] + #[arg(short, long, default_value = "Queryable from Rust!")] /// The value to reply to queries. value: String, #[arg(long)] diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 70bd3ee769..2b1c59ad23 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -23,6 +23,8 @@ use async_std::sync::Mutex as AsyncMutex; use async_std::task; use async_std::task::JoinHandle; use async_trait::async_trait; +use rustls::{Certificate, PrivateKey}; +use rustls_pemfile::Item; use std::collections::HashMap; use std::fmt; use std::io::BufReader; @@ -35,7 +37,7 @@ use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; -use zenoh_result::{bail, zerror, ZResult}; +use zenoh_result::{bail, zerror, ZError, ZResult}; use zenoh_sync::Signal; pub struct LinkUnicastQuic { @@ -261,14 +263,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { rustls_native_certs::load_native_certs() .map_err(|e| zerror!("Invalid QUIC CA certificate file: {}", e))? .drain(..) - .map(|x| rustls::Certificate(x.0)) + .map(|x| rustls::Certificate(x.to_vec())) .collect::>() } else { rustls_pemfile::certs(&mut BufReader::new(f.as_slice())) - .map_err(|e| zerror!("Invalid QUIC CA certificate file: {}", e))? - .drain(..) - .map(rustls::Certificate) - .collect::>() + .map(|result| { + result + .map_err(|err| zerror!("Invalid QUIC CA certificate file: {}", err)) + .map(|der| Certificate(der.to_vec())) + }) + .collect::, ZError>>()? }; for c in certificates.iter() { root_cert_store.add(c).map_err(|e| zerror!("{}", e))?; @@ -347,10 +351,12 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { bail!("No QUIC CA certificate has been provided."); }; let certificates = rustls_pemfile::certs(&mut BufReader::new(f.as_slice())) - .map_err(|e| zerror!("Invalid QUIC CA certificate file: {}", e))? - .drain(..) - .map(rustls::Certificate) - .collect(); + .map(|result| { + result + .map_err(|err| zerror!("Invalid QUIC CA certificate file: {}", err)) + .map(|der| Certificate(der.to_vec())) + }) + .collect::, ZError>>()?; // Private keys let f = if let Some(value) = epconf.get(TLS_SERVER_PRIVATE_KEY_RAW) { @@ -364,20 +370,24 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { } else { bail!("No QUIC CA private key has been provided."); }; - let private_key = rustls::PrivateKey( - rustls_pemfile::read_all(&mut BufReader::new(f.as_slice())) - .map_err(|e| zerror!("Invalid QUIC CA private key file: {}", e))? - .iter() - .filter_map(|x| match x { - rustls_pemfile::Item::RSAKey(k) - | rustls_pemfile::Item::PKCS8Key(k) - | rustls_pemfile::Item::ECKey(k) => Some(k.to_vec()), - _ => None, - }) - .take(1) - .next() - .ok_or_else(|| zerror!("No QUIC CA private key has been provided."))?, - ); + let items: Vec = rustls_pemfile::read_all(&mut BufReader::new(f.as_slice())) + .map(|result| { + result.map_err(|err| zerror!("Invalid QUIC CA private key file: {}", err)) + }) + .collect::, ZError>>()?; + + let private_key = items + .into_iter() + .filter_map(|x| match x { + rustls_pemfile::Item::Pkcs1Key(k) => Some(k.secret_pkcs1_der().to_vec()), + rustls_pemfile::Item::Pkcs8Key(k) => Some(k.secret_pkcs8_der().to_vec()), + rustls_pemfile::Item::Sec1Key(k) => Some(k.secret_sec1_der().to_vec()), + _ => None, + }) + .take(1) + .next() + .ok_or_else(|| zerror!("No QUIC CA private key has been provided.")) + .map(PrivateKey)?; // Server config let mut server_crypto = rustls::ServerConfig::builder() diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 63c6d63b1e..7761195e4b 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -32,7 +32,6 @@ use async_std::task::JoinHandle; use async_trait::async_trait; use futures::io::AsyncReadExt; use futures::io::AsyncWriteExt; -use std::cell::UnsafeCell; use std::collections::HashMap; use std::convert::TryInto; use std::fmt; @@ -42,14 +41,18 @@ use std::net::{IpAddr, Shutdown}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; -use webpki::TrustAnchor; +use std::{cell::UnsafeCell, io}; +use webpki::{ + anchor_from_trusted_cert, + types::{CertificateDer, TrustAnchor}, +}; use zenoh_core::{zasynclock, zread, zwrite}; use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::core::endpoint::Config; use zenoh_protocol::core::{EndPoint, Locator}; -use zenoh_result::{bail, zerror, ZResult}; +use zenoh_result::{bail, zerror, ZError, ZResult}; use zenoh_sync::Signal; pub struct LinkUnicastTls { @@ -525,32 +528,48 @@ impl TlsServerConfig { let tls_server_private_key = TlsServerConfig::load_tls_private_key(config).await?; let tls_server_certificate = TlsServerConfig::load_tls_certificate(config).await?; + let certs: Vec = + rustls_pemfile::certs(&mut Cursor::new(&tls_server_certificate)) + .map(|result| { + result + .map_err(|err| zerror!("Error processing server certificate: {err}.")) + .map(|der| Certificate(der.to_vec())) + }) + .collect::, ZError>>()?; + let mut keys: Vec = rustls_pemfile::rsa_private_keys(&mut Cursor::new(&tls_server_private_key)) - .map_err(|e| zerror!(e)) - .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; + .map(|result| { + result + .map_err(|err| zerror!("Error processing server key: {err}.")) + .map(|key| PrivateKey(key.secret_pkcs1_der().to_vec())) + }) + .collect::, ZError>>()?; if keys.is_empty() { keys = rustls_pemfile::pkcs8_private_keys(&mut Cursor::new(&tls_server_private_key)) - .map_err(|e| zerror!(e)) - .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; + .map(|result| { + result + .map_err(|err| zerror!("Error processing server key: {err}.")) + .map(|key| PrivateKey(key.secret_pkcs8_der().to_vec())) + }) + .collect::, ZError>>()?; } if keys.is_empty() { keys = rustls_pemfile::ec_private_keys(&mut Cursor::new(&tls_server_private_key)) - .map_err(|e| zerror!(e)) - .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; + .map(|result| { + result + .map_err(|err| zerror!("Error processing server key: {err}.")) + .map(|key| PrivateKey(key.secret_sec1_der().to_vec())) + }) + .collect::, ZError>>()?; } if keys.is_empty() { - bail!("No private key found"); + bail!("No private key found for TLS server."); } - let certs: Vec = - rustls_pemfile::certs(&mut Cursor::new(&tls_server_certificate)) - .map_err(|e| zerror!(e)) - .map(|mut certs| certs.drain(..).map(Certificate).collect())?; - let sc = if tls_server_client_auth { let root_cert_store = load_trust_anchors(config)?.map_or_else( || { @@ -643,23 +662,45 @@ impl TlsClientConfig { let certs: Vec = rustls_pemfile::certs(&mut Cursor::new(&tls_client_certificate)) - .map_err(|e| zerror!(e)) - .map(|mut certs| certs.drain(..).map(Certificate).collect())?; + .map(|result| { + result + .map_err(|err| zerror!("Error processing client certificate: {err}.")) + .map(|der| Certificate(der.to_vec())) + }) + .collect::, ZError>>()?; let mut keys: Vec = rustls_pemfile::rsa_private_keys(&mut Cursor::new(&tls_client_private_key)) - .map_err(|e| zerror!(e)) - .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; + .map(|result| { + result + .map_err(|err| zerror!("Error processing client key: {err}.")) + .map(|key| PrivateKey(key.secret_pkcs1_der().to_vec())) + }) + .collect::, ZError>>()?; if keys.is_empty() { keys = rustls_pemfile::pkcs8_private_keys(&mut Cursor::new(&tls_client_private_key)) - .map_err(|e| zerror!(e)) - .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; + .map(|result| { + result + .map_err(|err| zerror!("Error processing client key: {err}.")) + .map(|key| PrivateKey(key.secret_pkcs8_der().to_vec())) + }) + .collect::, ZError>>()?; } if keys.is_empty() { - bail!("No private key found"); + keys = rustls_pemfile::ec_private_keys(&mut Cursor::new(&tls_client_private_key)) + .map(|result| { + result + .map_err(|err| zerror!("Error processing client key: {err}.")) + .map(|key| PrivateKey(key.secret_sec1_der().to_vec())) + }) + .collect::, ZError>>()?; + } + + if keys.is_empty() { + bail!("No private key found for TLS client."); } let builder = ClientConfig::builder() @@ -765,57 +806,63 @@ fn load_trust_anchors(config: &Config<'_>) -> ZResult> { let mut root_cert_store = RootCertStore::empty(); if let Some(value) = config.get(TLS_ROOT_CA_CERTIFICATE_RAW) { let mut pem = BufReader::new(value.as_bytes()); - let certs = rustls_pemfile::certs(&mut pem)?; - let trust_anchors = certs.iter().map(|cert| { - let ta = TrustAnchor::try_from_cert_der(&cert[..]).unwrap(); - OwnedTrustAnchor::from_subject_spki_name_constraints( - ta.subject, - ta.spki, - ta.name_constraints, - ) - }); + let trust_anchors = process_pem(&mut pem)?; root_cert_store.add_trust_anchors(trust_anchors.into_iter()); return Ok(Some(root_cert_store)); } + if let Some(b64_certificate) = config.get(TLS_ROOT_CA_CERTIFICATE_BASE64) { let certificate_pem = base64_decode(b64_certificate)?; let mut pem = BufReader::new(certificate_pem.as_slice()); - let certs = rustls_pemfile::certs(&mut pem)?; - let trust_anchors = certs.iter().map(|cert| { - let ta = TrustAnchor::try_from_cert_der(&cert[..]).unwrap(); - OwnedTrustAnchor::from_subject_spki_name_constraints( - ta.subject, - ta.spki, - ta.name_constraints, - ) - }); + let trust_anchors = process_pem(&mut pem)?; root_cert_store.add_trust_anchors(trust_anchors.into_iter()); return Ok(Some(root_cert_store)); } + if let Some(filename) = config.get(TLS_ROOT_CA_CERTIFICATE_FILE) { let mut pem = BufReader::new(File::open(filename)?); - let certs = rustls_pemfile::certs(&mut pem)?; - let trust_anchors = certs.iter().map(|cert| { - let ta = TrustAnchor::try_from_cert_der(&cert[..]).unwrap(); - OwnedTrustAnchor::from_subject_spki_name_constraints( - ta.subject, - ta.spki, - ta.name_constraints, - ) - }); + let trust_anchors = process_pem(&mut pem)?; root_cert_store.add_trust_anchors(trust_anchors.into_iter()); return Ok(Some(root_cert_store)); } Ok(None) } +fn process_pem(pem: &mut dyn io::BufRead) -> ZResult> { + let certs: Vec = rustls_pemfile::certs(pem) + .map(|result| result.map_err(|err| zerror!("Error processing PEM certificates: {err}."))) + .collect::, ZError>>()?; + + let trust_anchors: Vec = certs + .into_iter() + .map(|cert| { + anchor_from_trusted_cert(&cert) + .map_err(|err| zerror!("Error processing trust anchor: {err}.")) + .map(|trust_anchor| trust_anchor.to_owned()) + }) + .collect::, ZError>>()?; + + let owned_trust_anchors: Vec = trust_anchors + .into_iter() + .map(|ta| { + OwnedTrustAnchor::from_subject_spki_name_constraints( + ta.subject.to_vec(), + ta.subject_public_key_info.to_vec(), + ta.name_constraints.map(|x| x.to_vec()), + ) + }) + .collect(); + + Ok(owned_trust_anchors) +} + fn load_default_webpki_certs() -> RootCertStore { let mut root_cert_store = RootCertStore::empty(); root_cert_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| { OwnedTrustAnchor::from_subject_spki_name_constraints( - ta.subject, - ta.spki, - ta.name_constraints, + ta.subject.to_vec(), + ta.subject_public_key_info.to_vec(), + ta.name_constraints.clone().map(|x| x.to_vec()), ) })); root_cert_store diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index d81abe5748..4139a65a05 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::num::{NonZeroU8, NonZeroUsize}; +use std::num::NonZeroUsize; use zenoh_buffers::{ buffer::Buffer, reader::{DidntRead, HasReader}, @@ -26,62 +26,125 @@ use zenoh_protocol::{ network::NetworkMessage, transport::{fragment::FragmentHeader, frame::FrameHeader, BatchSize, TransportMessage}, }; -use zenoh_result::ZResult; +use zenoh_result::{zerror, ZResult}; #[cfg(feature = "transport_compression")] -use {std::sync::Arc, zenoh_protocol::common::imsg, zenoh_result::zerror}; +use {std::sync::Arc, zenoh_protocol::common::imsg}; + +const L_LEN: usize = (BatchSize::BITS / 8) as usize; +const H_LEN: usize = BatchHeader::SIZE; // Split the inner buffer into (length, header, payload) inmutable slices -#[cfg(feature = "transport_compression")] macro_rules! zsplit { - ($slice:expr, $header:expr) => {{ - match $header.get() { - Some(_) => $slice.split_at(BatchHeader::INDEX + 1), - None => (&[], $slice), + ($slice:expr, $config:expr) => {{ + match ($config.is_streamed, $config.has_header()) { + (true, true) => { + let (l, s) = $slice.split_at(L_LEN); + let (h, p) = s.split_at(H_LEN); + (l, h, p) + } + (true, false) => { + let (l, p) = $slice.split_at(L_LEN); + (l, &[], p) + } + (false, true) => { + let (h, p) = $slice.split_at(H_LEN); + (&[], h, p) + } + (false, false) => (&[], &[], $slice), + } + }}; +} + +macro_rules! zsplit_mut { + ($slice:expr, $config:expr) => {{ + match ($config.is_streamed, $config.has_header()) { + (true, true) => { + let (l, s) = $slice.split_at_mut(L_LEN); + let (h, p) = s.split_at_mut(H_LEN); + (l, h, p) + } + (true, false) => { + let (l, p) = $slice.split_at_mut(L_LEN); + (l, &mut [], p) + } + (false, true) => { + let (h, p) = $slice.split_at_mut(H_LEN); + (&mut [], h, p) + } + (false, false) => (&mut [], &mut [], $slice), } }}; } // Batch config -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct BatchConfig { pub mtu: BatchSize, + pub is_streamed: bool, #[cfg(feature = "transport_compression")] pub is_compression: bool, } +impl Default for BatchConfig { + fn default() -> Self { + BatchConfig { + mtu: BatchSize::MAX, + is_streamed: false, + #[cfg(feature = "transport_compression")] + is_compression: false, + } + } +} + impl BatchConfig { - fn header(&self) -> BatchHeader { - #[allow(unused_mut)] // No need for mut when "transport_compression" is disabled - let mut h = 0; + const fn has_header(&self) -> bool { + #[cfg(not(feature = "transport_compression"))] + { + false + } + #[cfg(feature = "transport_compression")] + { + self.is_compression + } + } + + fn header(&self) -> Option { + #[cfg(not(feature = "transport_compression"))] + { + None + } #[cfg(feature = "transport_compression")] - if self.is_compression { - h |= BatchHeader::COMPRESSION; + { + self.is_compression + .then_some(BatchHeader::new(BatchHeader::COMPRESSION)) + } + } + + pub fn max_buffer_size(&self) -> usize { + let mut len = self.mtu as usize; + if self.is_streamed { + len += BatchSize::BITS as usize / 8; } - BatchHeader::new(h) + len } } // Batch header #[repr(transparent)] #[derive(Copy, Clone, Debug)] -pub struct BatchHeader(Option); +pub struct BatchHeader(u8); impl BatchHeader { + const SIZE: usize = 1; #[cfg(feature = "transport_compression")] - const INDEX: usize = 0; - #[cfg(feature = "transport_compression")] - const COMPRESSION: u8 = 1; - - fn new(h: u8) -> Self { - Self(NonZeroU8::new(h)) - } + const COMPRESSION: u8 = 1; // 1 << 0 #[cfg(feature = "transport_compression")] - const fn is_empty(&self) -> bool { - self.0.is_none() + const fn new(h: u8) -> Self { + Self(h) } - const fn get(&self) -> Option { + const fn as_u8(&self) -> u8 { self.0 } @@ -90,8 +153,7 @@ impl BatchHeader { #[cfg(feature = "transport_compression")] #[inline(always)] pub fn is_compression(&self) -> bool { - self.0 - .is_some_and(|h| imsg::has_flag(h.get(), Self::COMPRESSION)) + imsg::has_flag(self.as_u8(), Self::COMPRESSION) } } @@ -113,7 +175,6 @@ impl WBatchStats { #[derive(Debug)] pub enum Finalize { Batch, - #[cfg(feature = "transport_compression")] Buffer, } @@ -143,7 +204,7 @@ pub struct WBatch { // The batch codec pub codec: Zenoh080Batch, // It contains 1 byte as additional header, e.g. to signal the batch is compressed - pub header: BatchHeader, + pub config: BatchConfig, // Statistics related to this batch #[cfg(feature = "stats")] pub stats: WBatchStats, @@ -152,9 +213,9 @@ pub struct WBatch { impl WBatch { pub fn new(config: BatchConfig) -> Self { let mut batch = Self { - buffer: BBuf::with_capacity(config.mtu as usize), + buffer: BBuf::with_capacity(config.max_buffer_size()), codec: Zenoh080Batch::new(), - header: config.header(), + config, #[cfg(feature = "stats")] stats: WBatchStats::default(), }; @@ -174,7 +235,8 @@ impl WBatch { /// Get the total number of bytes that have been serialized on the [`WBatch`][WBatch]. #[inline(always)] pub fn len(&self) -> BatchSize { - self.buffer.len() as BatchSize + let (_l, _h, p) = Self::split(self.buffer.as_slice(), &self.config); + p.len() as BatchSize } /// Clear the [`WBatch`][WBatch] memory buffer and related internal state. @@ -186,10 +248,7 @@ impl WBatch { { self.stats.clear(); } - if let Some(h) = self.header.get() { - let mut writer = self.buffer.writer(); - let _ = writer.write_u8(h.get()); - } + Self::init(&mut self.buffer, &self.config); } /// Get a `&[u8]` to access the internal memory buffer, usually for transmitting it on the network. @@ -198,37 +257,70 @@ impl WBatch { self.buffer.as_slice() } + fn init(buffer: &mut BBuf, config: &BatchConfig) { + let mut writer = buffer.writer(); + if config.is_streamed { + let _ = writer.write_exact(&BatchSize::MIN.to_be_bytes()); + } + if let Some(h) = config.header() { + let _ = writer.write_u8(h.as_u8()); + } + } + // Split (length, header, payload) internal buffer slice #[inline(always)] - #[cfg(feature = "transport_compression")] - fn split(&self) -> (&[u8], &[u8]) { - zsplit!(self.buffer.as_slice(), self.header) + fn split<'a>(buffer: &'a [u8], config: &BatchConfig) -> (&'a [u8], &'a [u8], &'a [u8]) { + zsplit!(buffer, config) + } + + // Split (length, header, payload) internal buffer slice + #[inline(always)] + fn split_mut<'a>( + buffer: &'a mut [u8], + config: &BatchConfig, + ) -> (&'a mut [u8], &'a mut [u8], &'a mut [u8]) { + zsplit_mut!(buffer, config) } - pub fn finalize( - &mut self, - #[cfg(feature = "transport_compression")] buffer: Option<&mut BBuf>, - ) -> ZResult { + pub fn finalize(&mut self, mut buffer: Option<&mut BBuf>) -> ZResult { + #[allow(unused_mut)] + let mut res = Finalize::Batch; + #[cfg(feature = "transport_compression")] - if self.header.is_compression() { - let buffer = buffer.ok_or_else(|| zerror!("Support buffer not provided"))?; - buffer.clear(); - return self.compress(buffer); + if let Some(h) = self.config.header() { + if h.is_compression() { + let buffer = buffer + .as_mut() + .ok_or_else(|| zerror!("Support buffer not provided"))?; + res = self.compress(buffer)?; + } } - Ok(Finalize::Batch) + if self.config.is_streamed { + let buff = match res { + Finalize::Batch => self.buffer.as_mut_slice(), + Finalize::Buffer => buffer + .as_mut() + .ok_or_else(|| zerror!("Support buffer not provided"))? + .as_mut_slice(), + }; + let (length, header, payload) = Self::split_mut(buff, &self.config); + let len: BatchSize = (header.len() as BatchSize) + (payload.len() as BatchSize); + length.copy_from_slice(&len.to_le_bytes()); + } + + Ok(res) } #[cfg(feature = "transport_compression")] fn compress(&mut self, support: &mut BBuf) -> ZResult { // Write the initial bytes for the batch - let mut writer = support.writer(); - if let Some(h) = self.header.get() { - let _ = writer.write_u8(h.get()); - } + support.clear(); + Self::init(support, &self.config); // Compress the actual content - let (_header, payload) = self.split(); + let (_length, _header, payload) = Self::split(self.buffer.as_slice(), &self.config); + let mut writer = support.writer(); writer .with_slot(writer.remaining(), |b| { lz4_flex::block::compress_into(payload, b).unwrap_or(0) @@ -240,11 +332,8 @@ impl WBatch { Ok(Finalize::Buffer) } else { // Keep the original uncompressed buffer and unset the compression flag from the header - let h = self - .buffer - .as_mut_slice() - .get_mut(BatchHeader::INDEX) - .ok_or_else(|| zerror!("Header not present"))?; + let (_l, h, _p) = Self::split_mut(self.buffer.as_mut_slice(), &self.config); + let h = h.first_mut().ok_or_else(|| zerror!("Empty BatchHeader"))?; *h &= !BatchHeader::COMPRESSION; Ok(Finalize::Batch) } @@ -300,21 +389,19 @@ pub struct RBatch { buffer: ZSlice, // The batch codec codec: Zenoh080Batch, - // It contains 1 byte as additional header, e.g. to signal the batch is compressed - #[cfg(feature = "transport_compression")] - header: BatchHeader, + // The batch config + config: BatchConfig, } impl RBatch { - pub fn new(#[allow(unused_variables)] config: BatchConfig, buffer: T) -> Self + pub fn new(config: BatchConfig, buffer: T) -> Self where T: Into, { Self { buffer: buffer.into(), codec: Zenoh080Batch::new(), - #[cfg(feature = "transport_compression")] - header: config.header(), + config, } } @@ -329,9 +416,8 @@ impl RBatch { // Split (length, header, payload) internal buffer slice #[inline(always)] - #[cfg(feature = "transport_compression")] - fn split(&self) -> (&[u8], &[u8]) { - zsplit!(self.buffer.as_slice(), self.header) + fn split<'a>(buffer: &'a [u8], config: &BatchConfig) -> (&'a [u8], &'a [u8], &'a [u8]) { + zsplit!(buffer, config) } pub fn initialize(&mut self, #[allow(unused_variables)] buff: C) -> ZResult<()> @@ -339,41 +425,44 @@ impl RBatch { C: Fn() -> T + Copy, T: ZSliceBuffer + 'static, { + #[allow(unused_variables)] + let (l, h, p) = Self::split(self.buffer.as_slice(), &self.config); + #[cfg(feature = "transport_compression")] - if !self.header.is_empty() { - let h = *self - .buffer - .get(BatchHeader::INDEX) - .ok_or_else(|| zerror!("Batch header not present"))?; - let header = BatchHeader::new(h); - - if header.is_compression() { - self.decompress(buff)?; - } else { - self.buffer = self - .buffer - .subslice(BatchHeader::INDEX + 1, self.buffer.len()) - .ok_or_else(|| zerror!("Invalid batch length"))?; + { + if self.config.has_header() { + let b = *h + .first() + .ok_or_else(|| zerror!("Batch header not present"))?; + let header = BatchHeader::new(b); + + if header.is_compression() { + let zslice = self.decompress(p, buff)?; + self.buffer = zslice; + return Ok(()); + } } } + self.buffer = self + .buffer + .subslice(l.len() + h.len(), self.buffer.len()) + .ok_or_else(|| zerror!("Invalid batch length"))?; + Ok(()) } #[cfg(feature = "transport_compression")] - fn decompress(&mut self, mut buff: impl FnMut() -> T) -> ZResult<()> + fn decompress(&self, payload: &[u8], mut buff: impl FnMut() -> T) -> ZResult where T: ZSliceBuffer + 'static, { - let (_h, p) = self.split(); - let mut into = (buff)(); - let n = lz4_flex::block::decompress_into(p, into.as_mut_slice()) + let n = lz4_flex::block::decompress_into(payload, into.as_mut_slice()) .map_err(|_| zerror!("Decompression error"))?; - self.buffer = ZSlice::make(Arc::new(into), 0, n) + let zslice = ZSlice::make(Arc::new(into), 0, n) .map_err(|_| zerror!("Invalid decompression buffer length"))?; - - Ok(()) + Ok(zslice) } } @@ -392,6 +481,18 @@ impl Decode for &mut RBatch { } } +impl Decode<(TransportMessage, BatchSize)> for &mut RBatch { + type Error = DidntRead; + + fn decode(self) -> Result<(TransportMessage, BatchSize), Self::Error> { + let len = self.buffer.len() as BatchSize; + let mut reader = self.buffer.reader(); + let msg = self.codec.read(&mut reader)?; + let end = self.buffer.len() as BatchSize; + Ok((msg, len - end)) + } +} + #[cfg(test)] mod tests { use std::vec; @@ -399,6 +500,7 @@ mod tests { use super::*; use rand::Rng; use zenoh_buffers::ZBuf; + use zenoh_core::zcondfeat; use zenoh_protocol::{ core::{CongestionControl, Encoding, Priority, Reliability, WireExpr}, network::{ext, Push}, @@ -422,6 +524,7 @@ mod tests { for msg_in in msg_ins { let config = BatchConfig { mtu: BatchSize::MAX, + is_streamed: rng.gen_bool(0.5), #[cfg(feature = "transport_compression")] is_compression: rng.gen_bool(0.5), }; @@ -429,20 +532,17 @@ mod tests { wbatch.encode(&msg_in).unwrap(); println!("Encoded WBatch: {:?}", wbatch); - #[cfg(feature = "transport_compression")] - let mut buffer = config.is_compression.then_some(BBuf::with_capacity( - lz4_flex::block::get_maximum_output_size(wbatch.as_slice().len()), - )); + let mut buffer = zcondfeat!( + "transport_compression", + config.is_compression.then_some(BBuf::with_capacity( + lz4_flex::block::get_maximum_output_size(wbatch.as_slice().len()), + )), + None + ); - let res = wbatch - .finalize( - #[cfg(feature = "transport_compression")] - buffer.as_mut(), - ) - .unwrap(); + let res = wbatch.finalize(buffer.as_mut()).unwrap(); let bytes = match res { Finalize::Batch => wbatch.as_slice(), - #[cfg(feature = "transport_compression")] Finalize::Buffer => buffer.as_mut().unwrap().as_slice(), }; println!("Finalized WBatch: {:02x?}", bytes); @@ -465,6 +565,7 @@ mod tests { fn serialization_batch() { let config = BatchConfig { mtu: BatchSize::MAX, + is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, }; diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index c27c33f0ee..2e3af61d64 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -482,10 +482,7 @@ impl StageOut { #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct TransmissionPipelineConf { - pub(crate) is_streamed: bool, - #[cfg(feature = "transport_compression")] - pub(crate) is_compression: bool, - pub(crate) batch_size: BatchSize, + pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) backoff: Duration, } @@ -493,10 +490,12 @@ pub(crate) struct TransmissionPipelineConf { impl Default for TransmissionPipelineConf { fn default() -> Self { Self { - is_streamed: false, - #[cfg(feature = "transport_compression")] - is_compression: false, - batch_size: BatchSize::MAX, + batch: BatchConfig { + mtu: BatchSize::MAX, + is_streamed: false, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, queue_size: [1; Priority::NUM], backoff: Duration::from_micros(1), } @@ -533,12 +532,7 @@ impl TransmissionPipeline { let (mut s_ref_w, s_ref_r) = RingBuffer::::init(); // Fill the refill ring buffer with batches for _ in 0..*num { - let bc = BatchConfig { - mtu: config.batch_size, - #[cfg(feature = "transport_compression")] - is_compression: config.is_compression, - }; - let batch = WBatch::new(bc); + let batch = WBatch::new(config.batch); assert!(s_ref_w.push(batch).is_none()); } // Create the channel for notifying that new batches are in the refill ring buffer @@ -736,10 +730,12 @@ mod tests { const TIMEOUT: Duration = Duration::from_secs(60); const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf { - is_streamed: true, - #[cfg(feature = "transport_compression")] - is_compression: true, - batch_size: BatchSize::MAX, + batch: BatchConfig { + mtu: BatchSize::MAX, + is_streamed: true, + #[cfg(feature = "transport_compression")] + is_compression: true, + }, queue_size: [1; Priority::NUM], backoff: Duration::from_micros(1), }; @@ -875,7 +871,7 @@ mod tests { // Make sure to put only one message per batch: set the payload size // to half of the batch in such a way the serialized zenoh message // will be larger then half of the batch size (header + payload). - let payload_size = (CONFIG.batch_size / 2) as usize; + let payload_size = (CONFIG.batch.mtu / 2) as usize; // Send reliable messages let key = "test".into(); diff --git a/io/zenoh-transport/src/multicast/establishment.rs b/io/zenoh-transport/src/multicast/establishment.rs index e31ab05d30..cec09ebdf2 100644 --- a/io/zenoh-transport/src/multicast/establishment.rs +++ b/io/zenoh-transport/src/multicast/establishment.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use crate::{ - common::seq_num, + common::{batch::BatchConfig, seq_num}, multicast::{ link::{TransportLinkMulticast, TransportLinkMulticastConfig}, transport::TransportMulticastInner, @@ -62,9 +62,12 @@ pub(crate) async fn open_link( // Create the transport let locator = link.get_dst().to_owned(); let config = TransportLinkMulticastConfig { - mtu: link.get_mtu(), - #[cfg(feature = "transport_compression")] - is_compression: manager.config.multicast.is_compression, + batch: BatchConfig { + mtu: link.get_mtu(), + #[cfg(feature = "transport_compression")] + is_compression: manager.config.multicast.is_compression, + ..Default::default() + }, }; let link = TransportLinkMulticast::new(link, config); diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index fbb917c281..8e1d17fefe 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -34,10 +34,8 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -#[cfg(feature = "transport_compression")] -use zenoh_buffers::BBuf; -use zenoh_buffers::{ZSlice, ZSliceBuffer}; -use zenoh_core::zlock; +use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; +use zenoh_core::{zcondfeat, zlock}; use zenoh_link::{Link, LinkMulticast, Locator}; use zenoh_protocol::{ core::{Bits, Priority, Resolution, WhatAmI, ZenohId}, @@ -51,11 +49,7 @@ use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; /****************************/ #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub(crate) struct TransportLinkMulticastConfig { - // MTU - pub(crate) mtu: BatchSize, - // Compression is active on the link - #[cfg(feature = "transport_compression")] - pub(crate) is_compression: bool, + pub(crate) batch: BatchConfig, } #[derive(Clone, PartialEq, Eq)] @@ -66,25 +60,26 @@ pub(crate) struct TransportLinkMulticast { impl TransportLinkMulticast { pub(crate) fn new(link: LinkMulticast, mut config: TransportLinkMulticastConfig) -> Self { - config.mtu = link.get_mtu().min(config.mtu); + config.batch.mtu = link.get_mtu().min(config.batch.mtu); + config.batch.is_streamed = false; Self { link, config } } - const fn batch_config(&self) -> BatchConfig { - BatchConfig { - mtu: self.config.mtu, - #[cfg(feature = "transport_compression")] - is_compression: self.config.is_compression, - } - } - pub(crate) fn tx(&self) -> TransportLinkMulticastTx { TransportLinkMulticastTx { inner: self.clone(), - #[cfg(feature = "transport_compression")] - buffer: self.config.is_compression.then_some(BBuf::with_capacity( - lz4_flex::block::get_maximum_output_size(self.config.mtu as usize), - )), + buffer: zcondfeat!( + "transport_compression", + self.config + .batch + .is_compression + .then_some(BBuf::with_capacity( + lz4_flex::block::get_maximum_output_size( + self.config.batch.max_buffer_size() + ), + )), + None + ), } } @@ -148,7 +143,6 @@ impl From for Link { pub(crate) struct TransportLinkMulticastTx { pub(crate) inner: TransportLinkMulticast, - #[cfg(feature = "transport_compression")] pub(crate) buffer: Option, } @@ -157,15 +151,11 @@ impl TransportLinkMulticastTx { const ERR: &str = "Write error on link: "; let res = batch - .finalize( - #[cfg(feature = "transport_compression")] - self.buffer.as_mut(), - ) + .finalize(self.buffer.as_mut()) .map_err(|_| zerror!("{ERR}{self}"))?; let bytes = match res { Finalize::Batch => batch.as_slice(), - #[cfg(feature = "transport_compression")] Finalize::Buffer => self .buffer .as_ref() @@ -183,7 +173,7 @@ impl TransportLinkMulticastTx { const ERR: &str = "Write error on link: "; // Create the batch for serializing the message - let mut batch = WBatch::new(self.inner.batch_config()); + let mut batch = WBatch::new(self.inner.config.batch); batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?; let len = batch.len() as usize; self.send_batch(&mut batch).await?; @@ -225,7 +215,7 @@ impl TransportLinkMulticastRx { let mut into = (buff)(); let (n, locator) = self.inner.link.read(into.as_mut_slice()).await?; let buffer = ZSlice::make(Arc::new(into), 0, n).map_err(|_| zerror!("Error"))?; - let mut batch = RBatch::new(self.inner.batch_config(), buffer); + let mut batch = RBatch::new(self.inner.config.batch, buffer); batch.initialize(buff).map_err(|_| zerror!("{ERR}{self}"))?; Ok((batch, locator.into_owned())) } @@ -330,10 +320,7 @@ impl TransportLinkMulticastUniversal { if self.handle_tx.is_none() { let tpc = TransmissionPipelineConf { - is_streamed: false, - #[cfg(feature = "transport_compression")] - is_compression: self.link.config.is_compression, - batch_size: config.batch_size, + batch: self.link.config.batch, queue_size: self.transport.manager.config.queue_size, backoff: self.transport.manager.config.queue_backoff, }; @@ -582,7 +569,7 @@ async fn rx_task( } // The pool of buffers - let mtu = link.inner.config.mtu as usize; + let mtu = link.inner.config.batch.max_buffer_size(); let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 112b471b9e..a3e5651bdb 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -14,6 +14,7 @@ #[cfg(feature = "shared-memory")] use crate::unicast::shared_memory_unicast::Challenge; use crate::{ + common::batch::BatchConfig, unicast::{ establishment::{ compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, Zenoh080Cookie, @@ -586,11 +587,15 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> { let mtu = link.get_mtu(); + let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - mtu, direction: TransportLinkUnicastDirection::Inbound, - #[cfg(feature = "transport_compression")] - is_compression: false, + batch: BatchConfig { + mtu, + is_streamed, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, }; let mut link = TransportLinkUnicast::new(link.clone(), config); let mut fsm = AcceptLink { @@ -705,10 +710,13 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) }; let a_config = TransportLinkUnicastConfig { - mtu: state.transport.batch_size, direction: TransportLinkUnicastDirection::Inbound, - #[cfg(feature = "transport_compression")] - is_compression: state.link.ext_compression.is_compression(), + batch: BatchConfig { + mtu: state.transport.batch_size, + is_streamed, + #[cfg(feature = "transport_compression")] + is_compression: state.link.ext_compression.is_compression(), + }, }; let a_link = TransportLinkUnicast::new(link.link.clone(), a_config); let s_link = format!("{:?}", a_link); diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 4c1314dd29..6e10509d69 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -14,6 +14,7 @@ #[cfg(feature = "shared-memory")] use crate::unicast::shared_memory_unicast::Challenge; use crate::{ + common::batch::BatchConfig, unicast::{ establishment::{compute_sn, ext, finalize_transport, InputFinalize, OpenFsm}, link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection}, @@ -511,11 +512,15 @@ pub(crate) async fn open_link( link: LinkUnicast, manager: &TransportManager, ) -> ZResult { + let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { direction: TransportLinkUnicastDirection::Outbound, - mtu: link.get_mtu(), - #[cfg(feature = "transport_compression")] - is_compression: false, // Perform the exchange Init/Open exchange with no compression + batch: BatchConfig { + mtu: link.get_mtu(), + is_streamed, + #[cfg(feature = "transport_compression")] + is_compression: false, // Perform the exchange Init/Open exchange with no compression + }, }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = OpenLink { @@ -537,7 +542,7 @@ pub(crate) async fn open_link( .config .batch_size .min(batch_size::UNICAST) - .min(link.config.mtu), + .min(link.config.batch.mtu), resolution: manager.config.resolution, ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), #[cfg(feature = "transport_multilink")] @@ -616,10 +621,13 @@ pub(crate) async fn open_link( }; let o_config = TransportLinkUnicastConfig { - mtu: state.transport.batch_size, direction: TransportLinkUnicastDirection::Outbound, - #[cfg(feature = "transport_compression")] - is_compression: state.link.ext_compression.is_compression(), + batch: BatchConfig { + mtu: state.transport.batch_size, + is_streamed, + #[cfg(feature = "transport_compression")] + is_compression: state.link.ext_compression.is_compression(), + }, }; let o_link = TransportLinkUnicast::new(link.link.clone(), o_config); let s_link = format!("{:?}", o_link); diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index afc12bc87d..5b4da7365b 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -14,9 +14,8 @@ use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch}; use std::fmt; use std::sync::Arc; -#[cfg(feature = "transport_compression")] -use zenoh_buffers::BBuf; -use zenoh_buffers::{ZSlice, ZSliceBuffer}; +use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; +use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; use zenoh_protocol::transport::{BatchSize, Close, TransportMessage}; use zenoh_result::{zerror, ZResult}; @@ -31,11 +30,7 @@ pub(crate) enum TransportLinkUnicastDirection { pub(crate) struct TransportLinkUnicastConfig { // Inbound / outbound pub(crate) direction: TransportLinkUnicastDirection, - // MTU - pub(crate) mtu: BatchSize, - // Compression is active on the link - #[cfg(feature = "transport_compression")] - pub(crate) is_compression: bool, + pub(crate) batch: BatchConfig, } #[derive(Clone, PartialEq, Eq)] @@ -46,25 +41,23 @@ pub(crate) struct TransportLinkUnicast { impl TransportLinkUnicast { pub(crate) fn new(link: LinkUnicast, mut config: TransportLinkUnicastConfig) -> Self { - config.mtu = link.get_mtu().min(config.mtu); + config.batch.mtu = link.get_mtu().min(config.batch.mtu); Self { link, config } } - const fn batch_config(&self) -> BatchConfig { - BatchConfig { - mtu: self.config.mtu, - #[cfg(feature = "transport_compression")] - is_compression: self.config.is_compression, - } - } - pub(crate) fn tx(&self) -> TransportLinkUnicastTx { TransportLinkUnicastTx { inner: self.clone(), - #[cfg(feature = "transport_compression")] - buffer: self.config.is_compression.then_some(BBuf::with_capacity( - lz4_flex::block::get_maximum_output_size(self.config.mtu as usize), - )), + buffer: zcondfeat!( + "transport_compression", + self.config + .batch + .is_compression + .then_some(BBuf::with_capacity( + lz4_flex::block::get_maximum_output_size(self.config.batch.mtu as usize), + )), + None + ), } } @@ -128,7 +121,6 @@ impl From for Link { pub(crate) struct TransportLinkUnicastTx { pub(crate) inner: TransportLinkUnicast, - #[cfg(feature = "transport_compression")] pub(crate) buffer: Option, } @@ -139,15 +131,11 @@ impl TransportLinkUnicastTx { // log::trace!("WBatch: {:?}", batch); let res = batch - .finalize( - #[cfg(feature = "transport_compression")] - self.buffer.as_mut(), - ) + .finalize(self.buffer.as_mut()) .map_err(|_| zerror!("{ERR}{self}"))?; let bytes = match res { Finalize::Batch => batch.as_slice(), - #[cfg(feature = "transport_compression")] Finalize::Buffer => self .buffer .as_ref() @@ -158,14 +146,6 @@ impl TransportLinkUnicastTx { // log::trace!("WBytes: {:02x?}", bytes); // Send the message on the link - if self.inner.link.is_streamed() { - let len: BatchSize = bytes - .len() - .try_into() - .map_err(|_| zerror!("Invalid batch length"))?; - let len = len.to_le_bytes(); - self.inner.link.write_all(&len).await?; - } self.inner.link.write_all(bytes).await?; Ok(()) @@ -175,7 +155,7 @@ impl TransportLinkUnicastTx { const ERR: &str = "Write error on link: "; // Create the batch for serializing the message - let mut batch = WBatch::new(self.inner.batch_config()); + let mut batch = WBatch::new(self.inner.config.batch); batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?; let len = batch.len() as usize; self.send_batch(&mut batch).await?; @@ -191,14 +171,11 @@ impl fmt::Display for TransportLinkUnicastTx { impl fmt::Debug for TransportLinkUnicastTx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut s = f.debug_struct("TransportLinkUnicastRx"); - s.field("link", &self.inner.link) - .field("config", &self.inner.config); - #[cfg(feature = "transport_compression")] - { - s.field("buffer", &self.buffer.as_ref().map(|b| b.capacity())); - } - s.finish() + f.debug_struct("TransportLinkUnicastRx") + .field("link", &self.inner.link) + .field("config", &self.inner.config) + .field("buffer", &self.buffer.as_ref().map(|b| b.capacity())) + .finish() } } @@ -219,15 +196,15 @@ impl TransportLinkUnicastRx { // Read and decode the message length let mut len = BatchSize::MIN.to_le_bytes(); self.inner.link.read_exact(&mut len).await?; - let len = BatchSize::from_le_bytes(len) as usize; + let l = BatchSize::from_le_bytes(len) as usize; // Read the bytes let slice = into .as_mut_slice() - .get_mut(..len) + .get_mut(len.len()..len.len() + l) .ok_or_else(|| zerror!("{ERR}{self}. Invalid batch length or buffer size."))?; self.inner.link.read_exact(slice).await?; - len + len.len() + l } else { // Read the bytes self.inner.link.read(into.as_mut_slice()).await? @@ -237,7 +214,7 @@ impl TransportLinkUnicastRx { let buffer = ZSlice::make(Arc::new(into), 0, end) .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; - let mut batch = RBatch::new(self.inner.batch_config(), buffer); + let mut batch = RBatch::new(self.inner.config.batch, buffer); batch .initialize(buff) .map_err(|e| zerror!("{ERR}{self}. {e}."))?; @@ -248,7 +225,7 @@ impl TransportLinkUnicastRx { } pub async fn recv(&mut self) -> ZResult { - let mtu = self.inner.config.mtu as usize; + let mtu = self.inner.config.batch.mtu as usize; let mut batch = self .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice()) .await?; diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 437e9c4fa4..4cfbbee115 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -216,7 +216,7 @@ async fn rx_task_stream( } // The pool of buffers - let mtu = link.config.mtu as usize; + let mtu = link.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -248,7 +248,7 @@ async fn rx_task_dgram( rx_buffer_size: usize, ) -> ZResult<()> { // The pool of buffers - let mtu = link.config.mtu as usize; + let mtu = link.config.batch.max_buffer_size(); let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 74db7f751e..aba680bc43 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -16,7 +16,7 @@ use super::transport::TransportUnicastUniversal; use crate::common::stats::TransportStats; use crate::{ common::{ - batch::RBatch, + batch::{BatchConfig, RBatch}, pipeline::{ TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, TransmissionPipelineProducer, @@ -71,10 +71,12 @@ impl TransportLinkUnicastUniversal { ) { if self.handle_tx.is_none() { let config = TransmissionPipelineConf { - is_streamed: self.link.link.is_streamed(), - #[cfg(feature = "transport_compression")] - is_compression: self.link.config.is_compression, - batch_size: self.link.config.mtu, + batch: BatchConfig { + mtu: self.link.config.batch.mtu, + is_streamed: self.link.link.is_streamed(), + #[cfg(feature = "transport_compression")] + is_compression: self.link.config.batch.is_compression, + }, queue_size: self.transport.manager.config.queue_size, backoff: self.transport.manager.config.queue_backoff, }; @@ -257,7 +259,7 @@ async fn rx_task( } // The pool of buffers - let mtu = link.inner.config.mtu as usize; + let mtu = link.inner.config.batch.max_buffer_size(); let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index f486f25f3c..16f5fd4a36 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -550,18 +550,6 @@ impl StorageService { let mut storage = self.storage.lock().await; match storage.get(stripped_key, q.parameters()).await { Ok(stored_data) => { - // if key is not available, return Error - if stored_data.is_empty() { - log::info!("Requested key `{}` not found", q.key_expr()); - if let Err(e) = q.reply(Err("Key not found".into())).res().await { - log::warn!( - "Storage {} raised an error replying a query: {}", - self.name, - e - ) - } - return; - } for entry in stored_data { let sample = Sample::new(q.key_expr().clone(), entry.value) .with_timestamp(entry.timestamp);