From faa781cdafe7a656c2eb569d2d5c69eb1d41a27c Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 3 Oct 2023 18:09:50 +0200 Subject: [PATCH] Tests are now repassing --- commons/zenoh-config/src/defaults.rs | 3 +- io/zenoh-transport/src/lib.rs | 10 +++--- .../src/multicast/establishment.rs | 4 +-- io/zenoh-transport/src/multicast/transport.rs | 4 +-- .../src/unicast/establishment/accept.rs | 3 +- .../src/unicast/establishment/open.rs | 3 +- .../src/unicast/lowlatency/link.rs | 28 ++++++++-------- .../src/unicast/lowlatency/transport.rs | 13 ++++---- io/zenoh-transport/src/unicast/manager.rs | 32 +++++++++++-------- io/zenoh-transport/src/unicast/mod.rs | 2 +- .../src/unicast/test_helpers.rs | 4 +-- .../src/unicast/universal/link.rs | 4 +-- .../src/unicast/universal/transport.rs | 24 ++++++++------ io/zenoh-transport/tests/endpoints.rs | 4 +-- .../tests/multicast_transport.rs | 4 +-- .../tests/transport_whitelist.rs | 4 +-- .../tests/unicast_authenticator.rs | 27 ++++++++++------ .../tests/unicast_concurrent.rs | 4 +-- .../tests/unicast_intermittent.rs | 7 ++-- io/zenoh-transport/tests/unicast_multilink.rs | 6 ++-- io/zenoh-transport/tests/unicast_openclose.rs | 7 ++-- .../tests/unicast_priorities.rs | 8 ++--- .../tests/unicast_simultaneous.rs | 4 +-- io/zenoh-transport/tests/unicast_transport.rs | 7 ++-- zenoh/src/admin.rs | 4 +-- zenoh/src/key_expr.rs | 2 +- zenoh/src/net/routing/face.rs | 2 +- zenoh/src/net/routing/network.rs | 2 +- zenoh/src/net/routing/router.rs | 6 ++-- zenoh/src/net/runtime/adminspace.rs | 2 +- zenoh/src/net/runtime/mod.rs | 5 +-- zenoh/src/net/tests/tables.rs | 2 +- zenoh/src/publication.rs | 2 +- zenoh/src/queryable.rs | 2 +- zenoh/src/session.rs | 2 +- 35 files changed, 136 insertions(+), 111 deletions(-) diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 16d876591f..1cef93220c 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -126,9 +126,10 @@ impl Default for QoSConf { } } +#[allow(clippy::derivable_impls)] impl Default for CompressionConf { fn default() -> Self { - Self { enabled: true } + Self { enabled: false } } } diff --git a/io/zenoh-transport/src/lib.rs b/io/zenoh-transport/src/lib.rs index 05240710f6..f5c1e8b13e 100644 --- a/io/zenoh-transport/src/lib.rs +++ b/io/zenoh-transport/src/lib.rs @@ -18,9 +18,9 @@ //! //! [Click here for Zenoh's documentation](../zenoh/index.html) mod common; -mod manager; -mod multicast; -mod primitives; +pub mod manager; +pub mod multicast; +pub mod primitives; pub mod unicast; #[cfg(feature = "stats")] @@ -29,13 +29,11 @@ pub use common::stats; #[cfg(feature = "shared-memory")] mod shm; +use crate::{multicast::TransportMulticast, unicast::TransportUnicast}; pub use manager::*; -pub use multicast::*; -pub use primitives::*; use serde::Serialize; use std::any::Any; use std::sync::Arc; -pub use unicast::*; use zenoh_link::Link; use zenoh_protocol::core::{WhatAmI, ZenohId}; use zenoh_protocol::network::NetworkMessage; diff --git a/io/zenoh-transport/src/multicast/establishment.rs b/io/zenoh-transport/src/multicast/establishment.rs index fc4ad21da3..d81f49640d 100644 --- a/io/zenoh-transport/src/multicast/establishment.rs +++ b/io/zenoh-transport/src/multicast/establishment.rs @@ -13,8 +13,8 @@ // use crate::{ common::seq_num, - multicast::{transport::TransportMulticastInner, TransportMulticast}, - TransportConfigMulticast, TransportManager, + multicast::{transport::TransportMulticastInner, TransportConfigMulticast, TransportMulticast}, + TransportManager, }; use rand::Rng; use std::sync::Arc; diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index c4412447cf..7b06057118 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -16,8 +16,8 @@ use super::link::{TransportLinkMulticast, TransportLinkMulticastConfig}; #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::{ - TransportConfigMulticast, TransportManager, TransportMulticastEventHandler, TransportPeer, - TransportPeerEventHandler, + multicast::{TransportConfigMulticast, TransportMulticastEventHandler}, + TransportManager, TransportPeer, TransportPeerEventHandler, }; use async_trait::async_trait; use std::{ diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index f00afa0c1c..1f3c6e07d0 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -20,8 +20,9 @@ use crate::{ Zenoh080Cookie, }, link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection}, + TransportConfigUnicast, }, - TransportConfigUnicast, TransportManager, + TransportManager, }; use async_std::sync::Mutex; use async_trait::async_trait; diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index cb2b62953c..df9cfea357 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -17,8 +17,9 @@ use crate::{ unicast::{ establishment::{close_link, compute_sn, ext, finalize_transport, InputFinalize, OpenFsm}, link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection}, + TransportConfigUnicast, TransportUnicast, }, - TransportConfigUnicast, TransportManager, TransportUnicast, + TransportManager, }; use async_trait::async_trait; use std::time::Duration; diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 22b2fa8f8a..fc3f3e6186 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -34,7 +34,7 @@ pub(crate) async fn send_with_link( #[cfg(feature = "stats")] stats: &Arc, ) -> ZResult<()> { let len; - if link.is_streamed() { + if link.link.is_streamed() { let mut buffer = vec![0, 0, 0, 0]; let codec = Zenoh080::new(); let mut writer = buffer.writer(); @@ -47,7 +47,7 @@ pub(crate) async fn send_with_link( buffer[0..4].copy_from_slice(&le); - link.write_all(&buffer).await?; + link.link.write_all(&buffer).await?; } else { let mut buffer = vec![]; let codec = Zenoh080::new(); @@ -60,7 +60,7 @@ pub(crate) async fn send_with_link( { len = buffer.len() as u32; } - link.write_all(&buffer).await?; + link.link.write_all(&buffer).await?; } log::trace!("Sent: {:?}", msg); @@ -208,15 +208,15 @@ async fn rx_task_stream( async fn read(link: &TransportLinkUnicast, buffer: &mut [u8]) -> ZResult { // 16 bits for reading the batch length let mut length = [0_u8, 0_u8, 0_u8, 0_u8]; - link.read_exact(&mut length).await?; + link.link.read_exact(&mut length).await?; let n = u32::from_le_bytes(length) as usize; - link.read_exact(&mut buffer[0..n]).await?; + link.link.read_exact(&mut buffer[0..n]).await?; Ok(n) } // The pool of buffers - let mtu = link.get_mtu().min(rx_batch_size) as usize; + let mtu = link.link.get_mtu().min(rx_batch_size) as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -237,7 +237,7 @@ async fn rx_task_stream( // Deserialize all the messages from the current ZBuf let zslice = ZSlice::make(Arc::new(buffer), 0, bytes).unwrap(); - transport.read_messages(zslice, &link).await?; + transport.read_messages(zslice, &link.link).await?; } } @@ -249,7 +249,7 @@ async fn rx_task_dgram( rx_buffer_size: usize, ) -> ZResult<()> { // The pool of buffers - let mtu = link.get_mtu().min(rx_batch_size) as usize; + let mtu = link.link.get_mtu().min(rx_batch_size) as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -261,17 +261,19 @@ async fn rx_task_dgram( let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); // Async read from the underlying link - let bytes = - link.read(&mut buffer).timeout(lease).await.map_err(|_| { - zerror!("{}: expired after {} milliseconds", link, lease.as_millis()) - })??; + let bytes = link + .link + .read(&mut buffer) + .timeout(lease) + .await + .map_err(|_| zerror!("{}: expired after {} milliseconds", link, lease.as_millis()))??; #[cfg(feature = "stats")] transport.stats.inc_rx_bytes(bytes); // Deserialize all the messages from the current ZBuf let zslice = ZSlice::make(Arc::new(buffer), 0, bytes).unwrap(); - transport.read_messages(zslice, &link).await?; + transport.read_messages(zslice, &link.link).await?; } } diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index 53ed01c9e8..cd87c6b1c9 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -15,11 +15,13 @@ use super::link::send_with_link; #[cfg(feature = "stats")] use crate::stats::TransportStats; -use crate::transport_unicast_inner::TransportUnicastTrait; -use crate::unicast::link::{TransportLinkUnicast, TransportLinkUnicastConfig}; -use crate::TransportConfigUnicast; -use crate::TransportManager; -use crate::{TransportExecutor, TransportPeerEventHandler}; +use crate::{ + unicast::{ + link::TransportLinkUnicast, transport_unicast_inner::TransportUnicastTrait, + TransportConfigUnicast, + }, + TransportExecutor, TransportManager, TransportPeerEventHandler, +}; use async_executor::Task; #[cfg(feature = "transport_unixpipe")] use async_std::sync::RwLockUpgradableReadGuard; @@ -35,7 +37,6 @@ use zenoh_core::{zasynclock, zasyncread, zread, zwrite}; use zenoh_link::unixpipe::UNIXPIPE_LOCATOR_PREFIX; #[cfg(feature = "transport_unixpipe")] use zenoh_link::Link; -use zenoh_link::LinkUnicast; use zenoh_protocol::core::{WhatAmI, ZenohId}; use zenoh_protocol::network::NetworkMessage; use zenoh_protocol::transport::TransportBodyLowLatency; diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index 8624c372e9..3e1375b5a1 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -18,10 +18,11 @@ use crate::unicast::establishment::ext::auth::Auth; #[cfg(feature = "transport_multilink")] use crate::unicast::establishment::ext::multilink::MultiLink; use crate::{ - // lowlatency::transport::TransportUnicastLowlatency, - transport_unicast_inner::TransportUnicastTrait, - unicast::{link::TransportLinkUnicast, TransportConfigUnicast, TransportUnicast}, - universal::transport::TransportUnicastUniversal, + unicast::{ + link::TransportLinkUnicast, lowlatency::transport::TransportUnicastLowlatency, + transport_unicast_inner::TransportUnicastTrait, + universal::transport::TransportUnicastUniversal, TransportConfigUnicast, TransportUnicast, + }, TransportManager, }; use async_std::{prelude::FutureExt, sync::Mutex, task}; @@ -100,7 +101,7 @@ pub struct TransportManagerBuilderUnicast { pub(super) authenticator: Auth, pub(super) is_lowlatency: bool, #[cfg(feature = "transport_compression")] - pub(super) is_compress: bool, + pub(super) is_compression: bool, } impl TransportManagerBuilderUnicast { @@ -158,8 +159,8 @@ impl TransportManagerBuilderUnicast { } #[cfg(feature = "transport_compression")] - pub fn compression(mut self, is_compress: bool) -> Self { - self.is_compress = is_compress; + pub fn compression(mut self, is_compression: bool) -> Self { + self.is_compression = is_compression; self } @@ -188,6 +189,10 @@ impl TransportManagerBuilderUnicast { { self = self.authenticator(Auth::from_config(config).await?); } + #[cfg(feature = "transport_compression")] + { + self = self.compression(*config.transport().compression().enabled()); + } Ok(self) } @@ -215,7 +220,7 @@ impl TransportManagerBuilderUnicast { is_compressed: self.is_compressed, is_lowlatency: self.is_lowlatency, #[cfg(feature = "transport_compression")] - is_compression: self.is_compress, + is_compression: self.is_compression, }; let state = TransportManagerStateUnicast { @@ -261,7 +266,7 @@ impl Default for TransportManagerBuilderUnicast { authenticator: Auth::default(), is_lowlatency: *transport.lowlatency(), #[cfg(feature = "transport_compression")] - is_compress: *compression.enabled(), + is_compression: *compression.enabled(), } } } @@ -455,10 +460,9 @@ impl TransportManager { let a_t = { if config.is_lowlatency { log::debug!("Will use LowLatency transport!"); - // TransportUnicastLowlatency::make(self.clone(), config.clone(), link) - // .map_err(|e| (e, Some(close::reason::INVALID))) - // .map(|v| Arc::new(v) as Arc)? - panic!(); // @TODO + TransportUnicastLowlatency::make(self.clone(), config.clone(), link) + .map_err(|e| (e, Some(close::reason::INVALID))) + .map(|v| Arc::new(v) as Arc)? } else { log::debug!("Will use Universal transport!"); let t: Arc = @@ -591,7 +595,7 @@ impl TransportManager { } // A new link is available - log::trace!("New link waiting... {}", link); + log::trace!("Accepting link... {}", link); *guard += 1; drop(guard); diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index 578d4f1e02..3385cbed6a 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -13,7 +13,7 @@ // pub mod establishment; pub(crate) mod link; -// pub(crate) mod lowlatency; +pub(crate) mod lowlatency; pub(crate) mod manager; pub(crate) mod transport_unicast_inner; pub(crate) mod universal; diff --git a/io/zenoh-transport/src/unicast/test_helpers.rs b/io/zenoh-transport/src/unicast/test_helpers.rs index 403384c851..42ed6db927 100644 --- a/io/zenoh-transport/src/unicast/test_helpers.rs +++ b/io/zenoh-transport/src/unicast/test_helpers.rs @@ -11,11 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // - +use crate::{unicast::TransportManagerBuilderUnicast, TransportManager}; use zenoh_core::zcondfeat; -use crate::{TransportManager, TransportManagerBuilderUnicast}; - pub fn make_transport_manager_builder( #[cfg(feature = "transport_multilink")] max_links: usize, #[cfg(feature = "shared-memory")] with_shm: bool, diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 8317816767..b64282e958 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -32,7 +32,7 @@ use async_std::task::JoinHandle; use std::{sync::Arc, time::Duration}; use zenoh_buffers::ZSlice; use zenoh_protocol::transport::{BatchSize, KeepAlive, TransportMessage}; -use zenoh_result::{bail, zerror, ZResult}; +use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObjectPool, Signal}; #[derive(Clone)] @@ -270,8 +270,6 @@ async fn rx_task( let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); while !signal.is_triggered() { - // Retrieve one buffer - let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); // Retrieve one buffer let batch = RBatch::new( rx_batch_size, diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 43e411fc7f..48510c3952 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -15,10 +15,13 @@ use crate::stats::TransportStats; use crate::{ common::priority::{TransportPriorityRx, TransportPriorityTx}, - transport_unicast_inner::TransportUnicastTrait, - unicast::link::{TransportLinkUnicast, TransportLinkUnicastDirection}, - universal::link::TransportLinkUnicastUniversal, - TransportConfigUnicast, TransportExecutor, TransportManager, TransportPeerEventHandler, + unicast::{ + link::{TransportLinkUnicast, TransportLinkUnicastDirection}, + transport_unicast_inner::TransportUnicastTrait, + universal::link::TransportLinkUnicastUniversal, + TransportConfigUnicast, + }, + TransportExecutor, TransportManager, TransportPeerEventHandler, }; use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; use async_trait::async_trait; @@ -37,19 +40,22 @@ use zenoh_result::{bail, zerror, ZResult}; macro_rules! zlinkget { ($guard:expr, $link:expr) => { - $guard.iter().find(|tl| &tl.link == $link) + // Compare LinkUnicast link to not compare TransportLinkUnicast direction + $guard.iter().find(|tl| &tl.link.link == &$link.link) }; } macro_rules! zlinkgetmut { ($guard:expr, $link:expr) => { - $guard.iter_mut().find(|tl| &tl.link == $link) + // Compare LinkUnicast link to not compare TransportLinkUnicast direction + $guard.iter_mut().find(|tl| &tl.link.link == &$link.link) }; } macro_rules! zlinkindex { ($guard:expr, $link:expr) => { - $guard.iter().position(|tl| &tl.link == $link) + // Compare LinkUnicast link to not compare TransportLinkUnicast direction + $guard.iter().position(|tl| &tl.link.link == &$link.link) }; } @@ -438,9 +444,9 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } None => { bail!( - "Can not start Link TX {} with peer: {}", + "Can not start Link TX {} with ZID: {}", link, - self.config.zid + self.config.zid, ) } } diff --git a/io/zenoh-transport/tests/endpoints.rs b/io/zenoh-transport/tests/endpoints.rs index e372e9e013..2ac2084552 100644 --- a/io/zenoh-transport/tests/endpoints.rs +++ b/io/zenoh-transport/tests/endpoints.rs @@ -21,8 +21,8 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; use zenoh_transport::{ - TransportEventHandler, TransportManager, TransportMulticast, TransportMulticastEventHandler, - TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; const TIMEOUT: Duration = Duration::from_secs(60); diff --git a/io/zenoh-transport/tests/multicast_transport.rs b/io/zenoh-transport/tests/multicast_transport.rs index 28f69ef3b7..c686db4866 100644 --- a/io/zenoh-transport/tests/multicast_transport.rs +++ b/io/zenoh-transport/tests/multicast_transport.rs @@ -42,8 +42,8 @@ mod tests { }; use zenoh_result::ZResult; use zenoh_transport::{ - TransportEventHandler, TransportManager, TransportMulticast, - TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; const TIMEOUT: Duration = Duration::from_secs(60); diff --git a/io/zenoh-transport/tests/transport_whitelist.rs b/io/zenoh-transport/tests/transport_whitelist.rs index 5279dcff21..5a929ed18c 100644 --- a/io/zenoh-transport/tests/transport_whitelist.rs +++ b/io/zenoh-transport/tests/transport_whitelist.rs @@ -21,8 +21,8 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; use zenoh_transport::{ - TransportEventHandler, TransportManager, TransportMulticast, TransportMulticastEventHandler, - TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; const TIMEOUT: Duration = Duration::from_secs(60); diff --git a/io/zenoh-transport/tests/unicast_authenticator.rs b/io/zenoh-transport/tests/unicast_authenticator.rs index b22d7875fd..51e78d4ee8 100644 --- a/io/zenoh-transport/tests/unicast_authenticator.rs +++ b/io/zenoh-transport/tests/unicast_authenticator.rs @@ -21,11 +21,12 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; use zenoh_transport::{ - unicast::establishment::ext::auth::Auth, TransportMulticast, TransportMulticastEventHandler, + multicast::TransportMulticast, unicast::establishment::ext::auth::Auth, + TransportMulticastEventHandler, }; use zenoh_transport::{ - DummyTransportPeerEventHandler, TransportEventHandler, TransportPeer, - TransportPeerEventHandler, TransportUnicast, + unicast::TransportUnicast, DummyTransportPeerEventHandler, TransportEventHandler, + TransportPeer, TransportPeerEventHandler, }; const TIMEOUT: Duration = Duration::from_secs(60); @@ -109,9 +110,13 @@ impl TransportEventHandler for SHClientAuthenticator { #[cfg(feature = "auth_pubkey")] async fn auth_pubkey(endpoint: &EndPoint, lowlatency_transport: bool) { use rsa::{BigUint, RsaPrivateKey, RsaPublicKey}; - use zenoh_transport::test_helpers::make_basic_transport_manager_builder; - use zenoh_transport::unicast::establishment::ext::auth::AuthPubKey; - use zenoh_transport::TransportManager; + use zenoh_transport::{ + unicast::{ + establishment::ext::auth::AuthPubKey, + test_helpers::make_basic_transport_manager_builder, + }, + TransportManager, + }; // Create the transport transport manager for the client 01 let client01_id = ZenohId::try_from([2]).unwrap(); @@ -411,9 +416,13 @@ async fn auth_pubkey(endpoint: &EndPoint, lowlatency_transport: bool) { #[cfg(feature = "auth_usrpwd")] async fn auth_usrpwd(endpoint: &EndPoint, lowlatency_transport: bool) { - use zenoh_transport::test_helpers::make_basic_transport_manager_builder; - use zenoh_transport::unicast::establishment::ext::auth::AuthUsrPwd; - use zenoh_transport::TransportManager; + use zenoh_transport::{ + unicast::{ + establishment::ext::auth::AuthUsrPwd, + test_helpers::make_basic_transport_manager_builder, + }, + TransportManager, + }; /* [CLIENT] */ let client01_id = ZenohId::try_from([2]).unwrap(); diff --git a/io/zenoh-transport/tests/unicast_concurrent.rs b/io/zenoh-transport/tests/unicast_concurrent.rs index 11f5e46ca7..64516f6f26 100644 --- a/io/zenoh-transport/tests/unicast_concurrent.rs +++ b/io/zenoh-transport/tests/unicast_concurrent.rs @@ -33,8 +33,8 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; use zenoh_transport::{ - TransportEventHandler, TransportManager, TransportMulticast, TransportMulticastEventHandler, - TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; const MSG_COUNT: usize = 1_000; diff --git a/io/zenoh-transport/tests/unicast_intermittent.rs b/io/zenoh-transport/tests/unicast_intermittent.rs index 01ee0e3751..4c7934309b 100644 --- a/io/zenoh-transport/tests/unicast_intermittent.rs +++ b/io/zenoh-transport/tests/unicast_intermittent.rs @@ -33,10 +33,11 @@ use zenoh_protocol::{ zenoh::Put, }; use zenoh_result::ZResult; -use zenoh_transport::test_helpers::make_transport_manager_builder; use zenoh_transport::{ - DummyTransportPeerEventHandler, TransportEventHandler, TransportManager, TransportMulticast, - TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, + unicast::{test_helpers::make_transport_manager_builder, TransportUnicast}, + DummyTransportPeerEventHandler, TransportEventHandler, TransportManager, + TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; const MSG_SIZE: usize = 8; diff --git a/io/zenoh-transport/tests/unicast_multilink.rs b/io/zenoh-transport/tests/unicast_multilink.rs index 182408f75b..cd8a48565a 100644 --- a/io/zenoh-transport/tests/unicast_multilink.rs +++ b/io/zenoh-transport/tests/unicast_multilink.rs @@ -20,9 +20,9 @@ mod tests { use zenoh_protocol::core::{WhatAmI, ZenohId}; use zenoh_result::ZResult; use zenoh_transport::{ - DummyTransportPeerEventHandler, TransportEventHandler, TransportManager, - TransportMulticast, TransportMulticastEventHandler, TransportPeer, - TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, unicast::TransportUnicast, DummyTransportPeerEventHandler, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, }; const TIMEOUT: Duration = Duration::from_secs(60); diff --git a/io/zenoh-transport/tests/unicast_openclose.rs b/io/zenoh-transport/tests/unicast_openclose.rs index f361f6f684..76a63cc6e0 100644 --- a/io/zenoh-transport/tests/unicast_openclose.rs +++ b/io/zenoh-transport/tests/unicast_openclose.rs @@ -18,9 +18,10 @@ use zenoh_link::EndPoint; use zenoh_protocol::core::{WhatAmI, ZenohId}; use zenoh_result::ZResult; use zenoh_transport::{ - test_helpers::make_transport_manager_builder, DummyTransportPeerEventHandler, - TransportEventHandler, TransportManager, TransportMulticast, TransportMulticastEventHandler, - TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, + unicast::{test_helpers::make_transport_manager_builder, TransportUnicast}, + DummyTransportPeerEventHandler, TransportEventHandler, TransportManager, + TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; const TIMEOUT: Duration = Duration::from_secs(60); diff --git a/io/zenoh-transport/tests/unicast_priorities.rs b/io/zenoh-transport/tests/unicast_priorities.rs index 7d8b70b4d3..07f8e43bcb 100644 --- a/io/zenoh-transport/tests/unicast_priorities.rs +++ b/io/zenoh-transport/tests/unicast_priorities.rs @@ -35,8 +35,8 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; use zenoh_transport::{ - TransportEventHandler, TransportManager, TransportMulticast, TransportMulticastEventHandler, - TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; const TIMEOUT: Duration = Duration::from_secs(60); @@ -102,8 +102,8 @@ impl TransportEventHandler for SHRouter { fn new_multicast( &self, - _transport: zenoh_transport::TransportMulticast, - ) -> ZResult> { + _transport: TransportMulticast, + ) -> ZResult> { panic!(); } } diff --git a/io/zenoh-transport/tests/unicast_simultaneous.rs b/io/zenoh-transport/tests/unicast_simultaneous.rs index 3de47aba03..dad4b6f775 100644 --- a/io/zenoh-transport/tests/unicast_simultaneous.rs +++ b/io/zenoh-transport/tests/unicast_simultaneous.rs @@ -32,8 +32,8 @@ mod tests { }; use zenoh_result::ZResult; use zenoh_transport::{ - TransportEventHandler, TransportManager, TransportMulticast, - TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; const TIMEOUT: Duration = Duration::from_secs(60); diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index e01d9d0130..617ebe4fbe 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -35,10 +35,11 @@ use zenoh_protocol::{ zenoh::Put, }; use zenoh_result::ZResult; -use zenoh_transport::test_helpers::make_transport_manager_builder; use zenoh_transport::{ - TransportEventHandler, TransportManager, TransportMulticast, TransportMulticastEventHandler, - TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, + unicast::{test_helpers::make_transport_manager_builder, TransportUnicast}, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, }; // These keys and certificates below are purposedly generated to run TLS and mTLS tests. diff --git a/zenoh/src/admin.rs b/zenoh/src/admin.rs index 56772797ce..a8aad9c809 100644 --- a/zenoh/src/admin.rs +++ b/zenoh/src/admin.rs @@ -122,14 +122,14 @@ impl TransportEventHandler for Handler { fn new_unicast( &self, peer: zenoh_transport::TransportPeer, - _transport: zenoh_transport::TransportUnicast, + _transport: zenoh_transport::unicast::TransportUnicast, ) -> ZResult> { self.new_peer(peer) } fn new_multicast( &self, - _transport: zenoh_transport::TransportMulticast, + _transport: zenoh_transport::multicast::TransportMulticast, ) -> ZResult> { Ok(Arc::new(self.clone())) } diff --git a/zenoh/src/key_expr.rs b/zenoh/src/key_expr.rs index ad41c30457..47bf6e776f 100644 --- a/zenoh/src/key_expr.rs +++ b/zenoh/src/key_expr.rs @@ -26,7 +26,7 @@ use zenoh_protocol::{ network::{declare, DeclareBody, Mapping, UndeclareKeyExpr}, }; use zenoh_result::ZResult; -use zenoh_transport::Primitives; +use zenoh_transport::primitives::Primitives; use crate::{prelude::Selector, Session, Undeclarable}; diff --git a/zenoh/src/net/routing/face.rs b/zenoh/src/net/routing/face.rs index d84f173d26..f2ee0094ba 100644 --- a/zenoh/src/net/routing/face.rs +++ b/zenoh/src/net/routing/face.rs @@ -25,7 +25,7 @@ use zenoh_protocol::{ }; #[cfg(feature = "stats")] use zenoh_transport::stats::TransportStats; -use zenoh_transport::{Primitives, TransportMulticast}; +use zenoh_transport::{multicast::TransportMulticast, primitives::Primitives}; pub struct FaceState { pub(super) id: usize, diff --git a/zenoh/src/net/routing/network.rs b/zenoh/src/net/routing/network.rs index 3af1e0a87c..0fb9f36120 100644 --- a/zenoh/src/net/routing/network.rs +++ b/zenoh/src/net/routing/network.rs @@ -27,7 +27,7 @@ use zenoh_protocol::common::ZExtBody; use zenoh_protocol::core::{WhatAmI, WhatAmIMatcher, ZenohId}; use zenoh_protocol::network::oam::id::OAM_LINKSTATE; use zenoh_protocol::network::{oam, NetworkBody, NetworkMessage, Oam}; -use zenoh_transport::TransportUnicast; +use zenoh_transport::unicast::TransportUnicast; #[derive(Clone)] struct Details { diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index dbf687ba79..ac283f9bf0 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -37,8 +37,10 @@ use zenoh_protocol::network::{Mapping, NetworkBody, NetworkMessage}; #[cfg(feature = "stats")] use zenoh_transport::stats::TransportStats; use zenoh_transport::{ - DeMux, DummyPrimitives, McastMux, Mux, Primitives, TransportMulticast, TransportPeer, - TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, + primitives::{DeMux, DummyPrimitives, McastMux, Mux, Primitives}, + unicast::TransportUnicast, + TransportPeer, TransportPeerEventHandler, }; // use zenoh_collections::Timer; use zenoh_core::zconfigurable; diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 0eb099a098..ea017fbc5e 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -38,7 +38,7 @@ use zenoh_protocol::{ zenoh::{PushBody, RequestBody}, }; use zenoh_result::ZResult; -use zenoh_transport::{Primitives, TransportUnicast}; +use zenoh_transport::{primitives::Primitives, unicast::TransportUnicast}; pub struct AdminContext { runtime: Runtime, diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 92d369e998..f9486ea59c 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -42,8 +42,9 @@ use zenoh_protocol::network::{NetworkBody, NetworkMessage}; use zenoh_result::{bail, ZResult}; use zenoh_sync::get_mut_unchecked; use zenoh_transport::{ - DeMux, TransportEventHandler, TransportManager, TransportMulticast, - TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, TransportUnicast, + multicast::TransportMulticast, primitives::DeMux, unicast::TransportUnicast, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, }; pub struct RuntimeState { diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 5dadf8d8a9..933a2e46a4 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -27,7 +27,7 @@ use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; use zenoh_protocol::network::declare::Mode; use zenoh_protocol::network::{ext, Declare, DeclareBody, DeclareKeyExpr}; use zenoh_protocol::zenoh::{PushBody, Put}; -use zenoh_transport::{DummyPrimitives, Primitives}; +use zenoh_transport::primitives::{DummyPrimitives, Primitives}; #[test] fn base_test() { diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 071eb97da9..72a4ee3c82 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -14,7 +14,7 @@ //! Publishing primitives. -use crate::net::transport::Primitives; +use crate::net::transport::primitives::Primitives; use crate::prelude::*; use crate::sample::DataInfo; use crate::Encoding; diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index ed0560d759..9c45487f01 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -31,7 +31,7 @@ use zenoh_protocol::network::{response, Mapping, RequestId, Response, ResponseFi use zenoh_protocol::zenoh::reply::ext::ConsolidationType; use zenoh_protocol::zenoh::{self, ResponseBody}; use zenoh_result::ZResult; -use zenoh_transport::Primitives; +use zenoh_transport::primitives::Primitives; pub(crate) struct QueryInner { /// The key expression of this Query. diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 744f21965f..a1a5f9f030 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -22,7 +22,7 @@ use crate::key_expr::KeyExprInner; use crate::liveliness::{Liveliness, LivelinessTokenState}; use crate::net::routing::face::Face; use crate::net::runtime::Runtime; -use crate::net::transport::Primitives; +use crate::net::transport::primitives::Primitives; use crate::prelude::Locality; use crate::prelude::{KeyExpr, Parameters}; use crate::publication::*;