Skip to content

Commit

Permalink
Tests are now repassing
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Oct 3, 2023
1 parent 1d2caa8 commit faa781c
Show file tree
Hide file tree
Showing 35 changed files with 136 additions and 111 deletions.
3 changes: 2 additions & 1 deletion commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ impl Default for QoSConf {
}
}

#[allow(clippy::derivable_impls)]
impl Default for CompressionConf {
fn default() -> Self {
Self { enabled: true }
Self { enabled: false }
}
}

Expand Down
10 changes: 4 additions & 6 deletions io/zenoh-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
mod common;
mod manager;
mod multicast;
mod primitives;
pub mod manager;
pub mod multicast;
pub mod primitives;
pub mod unicast;

#[cfg(feature = "stats")]
Expand All @@ -29,13 +29,11 @@ pub use common::stats;
#[cfg(feature = "shared-memory")]
mod shm;

use crate::{multicast::TransportMulticast, unicast::TransportUnicast};
pub use manager::*;
pub use multicast::*;
pub use primitives::*;
use serde::Serialize;
use std::any::Any;
use std::sync::Arc;
pub use unicast::*;
use zenoh_link::Link;
use zenoh_protocol::core::{WhatAmI, ZenohId};
use zenoh_protocol::network::NetworkMessage;
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/multicast/establishment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
//
use crate::{
common::seq_num,
multicast::{transport::TransportMulticastInner, TransportMulticast},
TransportConfigMulticast, TransportManager,
multicast::{transport::TransportMulticastInner, TransportConfigMulticast, TransportMulticast},
TransportManager,
};
use rand::Rng;
use std::sync::Arc;
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use super::link::{TransportLinkMulticast, TransportLinkMulticastConfig};
#[cfg(feature = "stats")]
use crate::stats::TransportStats;
use crate::{
TransportConfigMulticast, TransportManager, TransportMulticastEventHandler, TransportPeer,
TransportPeerEventHandler,
multicast::{TransportConfigMulticast, TransportMulticastEventHandler},
TransportManager, TransportPeer, TransportPeerEventHandler,
};
use async_trait::async_trait;
use std::{
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use crate::{
Zenoh080Cookie,
},
link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection},
TransportConfigUnicast,
},
TransportConfigUnicast, TransportManager,
TransportManager,
};
use async_std::sync::Mutex;
use async_trait::async_trait;
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use crate::{
unicast::{
establishment::{close_link, compute_sn, ext, finalize_transport, InputFinalize, OpenFsm},
link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection},
TransportConfigUnicast, TransportUnicast,
},
TransportConfigUnicast, TransportManager, TransportUnicast,
TransportManager,
};
use async_trait::async_trait;
use std::time::Duration;
Expand Down
28 changes: 15 additions & 13 deletions io/zenoh-transport/src/unicast/lowlatency/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) async fn send_with_link(
#[cfg(feature = "stats")] stats: &Arc<TransportStats>,
) -> ZResult<()> {
let len;
if link.is_streamed() {
if link.link.is_streamed() {
let mut buffer = vec![0, 0, 0, 0];
let codec = Zenoh080::new();
let mut writer = buffer.writer();
Expand All @@ -47,7 +47,7 @@ pub(crate) async fn send_with_link(

buffer[0..4].copy_from_slice(&le);

link.write_all(&buffer).await?;
link.link.write_all(&buffer).await?;
} else {
let mut buffer = vec![];
let codec = Zenoh080::new();
Expand All @@ -60,7 +60,7 @@ pub(crate) async fn send_with_link(
{
len = buffer.len() as u32;
}
link.write_all(&buffer).await?;
link.link.write_all(&buffer).await?;
}
log::trace!("Sent: {:?}", msg);

Expand Down Expand Up @@ -208,15 +208,15 @@ async fn rx_task_stream(
async fn read(link: &TransportLinkUnicast, buffer: &mut [u8]) -> ZResult<usize> {
// 16 bits for reading the batch length
let mut length = [0_u8, 0_u8, 0_u8, 0_u8];
link.read_exact(&mut length).await?;
link.link.read_exact(&mut length).await?;
let n = u32::from_le_bytes(length) as usize;

link.read_exact(&mut buffer[0..n]).await?;
link.link.read_exact(&mut buffer[0..n]).await?;
Ok(n)
}

// The pool of buffers
let mtu = link.get_mtu().min(rx_batch_size) as usize;
let mtu = link.link.get_mtu().min(rx_batch_size) as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
Expand All @@ -237,7 +237,7 @@ async fn rx_task_stream(

// Deserialize all the messages from the current ZBuf
let zslice = ZSlice::make(Arc::new(buffer), 0, bytes).unwrap();
transport.read_messages(zslice, &link).await?;
transport.read_messages(zslice, &link.link).await?;
}
}

Expand All @@ -249,7 +249,7 @@ async fn rx_task_dgram(
rx_buffer_size: usize,
) -> ZResult<()> {
// The pool of buffers
let mtu = link.get_mtu().min(rx_batch_size) as usize;
let mtu = link.link.get_mtu().min(rx_batch_size) as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
Expand All @@ -261,17 +261,19 @@ async fn rx_task_dgram(
let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc());

// Async read from the underlying link
let bytes =
link.read(&mut buffer).timeout(lease).await.map_err(|_| {
zerror!("{}: expired after {} milliseconds", link, lease.as_millis())
})??;
let bytes = link
.link
.read(&mut buffer)
.timeout(lease)
.await
.map_err(|_| zerror!("{}: expired after {} milliseconds", link, lease.as_millis()))??;

#[cfg(feature = "stats")]
transport.stats.inc_rx_bytes(bytes);

// Deserialize all the messages from the current ZBuf
let zslice = ZSlice::make(Arc::new(buffer), 0, bytes).unwrap();
transport.read_messages(zslice, &link).await?;
transport.read_messages(zslice, &link.link).await?;
}
}

Expand Down
13 changes: 7 additions & 6 deletions io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use super::link::send_with_link;
#[cfg(feature = "stats")]
use crate::stats::TransportStats;
use crate::transport_unicast_inner::TransportUnicastTrait;
use crate::unicast::link::{TransportLinkUnicast, TransportLinkUnicastConfig};
use crate::TransportConfigUnicast;
use crate::TransportManager;
use crate::{TransportExecutor, TransportPeerEventHandler};
use crate::{
unicast::{
link::TransportLinkUnicast, transport_unicast_inner::TransportUnicastTrait,
TransportConfigUnicast,
},
TransportExecutor, TransportManager, TransportPeerEventHandler,
};
use async_executor::Task;
#[cfg(feature = "transport_unixpipe")]
use async_std::sync::RwLockUpgradableReadGuard;
Expand All @@ -35,7 +37,6 @@ use zenoh_core::{zasynclock, zasyncread, zread, zwrite};
use zenoh_link::unixpipe::UNIXPIPE_LOCATOR_PREFIX;
#[cfg(feature = "transport_unixpipe")]
use zenoh_link::Link;
use zenoh_link::LinkUnicast;
use zenoh_protocol::core::{WhatAmI, ZenohId};
use zenoh_protocol::network::NetworkMessage;
use zenoh_protocol::transport::TransportBodyLowLatency;
Expand Down
32 changes: 18 additions & 14 deletions io/zenoh-transport/src/unicast/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use crate::unicast::establishment::ext::auth::Auth;
#[cfg(feature = "transport_multilink")]
use crate::unicast::establishment::ext::multilink::MultiLink;
use crate::{
// lowlatency::transport::TransportUnicastLowlatency,
transport_unicast_inner::TransportUnicastTrait,
unicast::{link::TransportLinkUnicast, TransportConfigUnicast, TransportUnicast},
universal::transport::TransportUnicastUniversal,
unicast::{
link::TransportLinkUnicast, lowlatency::transport::TransportUnicastLowlatency,
transport_unicast_inner::TransportUnicastTrait,
universal::transport::TransportUnicastUniversal, TransportConfigUnicast, TransportUnicast,
},
TransportManager,
};
use async_std::{prelude::FutureExt, sync::Mutex, task};
Expand Down Expand Up @@ -100,7 +101,7 @@ pub struct TransportManagerBuilderUnicast {
pub(super) authenticator: Auth,
pub(super) is_lowlatency: bool,
#[cfg(feature = "transport_compression")]
pub(super) is_compress: bool,
pub(super) is_compression: bool,
}

impl TransportManagerBuilderUnicast {
Expand Down Expand Up @@ -158,8 +159,8 @@ impl TransportManagerBuilderUnicast {
}

#[cfg(feature = "transport_compression")]
pub fn compression(mut self, is_compress: bool) -> Self {
self.is_compress = is_compress;
pub fn compression(mut self, is_compression: bool) -> Self {
self.is_compression = is_compression;
self
}

Expand Down Expand Up @@ -188,6 +189,10 @@ impl TransportManagerBuilderUnicast {
{
self = self.authenticator(Auth::from_config(config).await?);
}
#[cfg(feature = "transport_compression")]
{
self = self.compression(*config.transport().compression().enabled());
}

Ok(self)
}
Expand Down Expand Up @@ -215,7 +220,7 @@ impl TransportManagerBuilderUnicast {
is_compressed: self.is_compressed,
is_lowlatency: self.is_lowlatency,
#[cfg(feature = "transport_compression")]
is_compression: self.is_compress,
is_compression: self.is_compression,
};

let state = TransportManagerStateUnicast {
Expand Down Expand Up @@ -261,7 +266,7 @@ impl Default for TransportManagerBuilderUnicast {
authenticator: Auth::default(),
is_lowlatency: *transport.lowlatency(),
#[cfg(feature = "transport_compression")]
is_compress: *compression.enabled(),
is_compression: *compression.enabled(),
}
}
}
Expand Down Expand Up @@ -455,10 +460,9 @@ impl TransportManager {
let a_t = {
if config.is_lowlatency {
log::debug!("Will use LowLatency transport!");
// TransportUnicastLowlatency::make(self.clone(), config.clone(), link)
// .map_err(|e| (e, Some(close::reason::INVALID)))
// .map(|v| Arc::new(v) as Arc<dyn TransportUnicastTrait>)?
panic!(); // @TODO
TransportUnicastLowlatency::make(self.clone(), config.clone(), link)
.map_err(|e| (e, Some(close::reason::INVALID)))
.map(|v| Arc::new(v) as Arc<dyn TransportUnicastTrait>)?
} else {
log::debug!("Will use Universal transport!");
let t: Arc<dyn TransportUnicastTrait> =
Expand Down Expand Up @@ -591,7 +595,7 @@ impl TransportManager {
}

// A new link is available
log::trace!("New link waiting... {}", link);
log::trace!("Accepting link... {}", link);
*guard += 1;
drop(guard);

Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/unicast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
pub mod establishment;
pub(crate) mod link;
// pub(crate) mod lowlatency;
pub(crate) mod lowlatency;
pub(crate) mod manager;
pub(crate) mod transport_unicast_inner;
pub(crate) mod universal;
Expand Down
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/unicast/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use crate::{unicast::TransportManagerBuilderUnicast, TransportManager};
use zenoh_core::zcondfeat;

use crate::{TransportManager, TransportManagerBuilderUnicast};

pub fn make_transport_manager_builder(
#[cfg(feature = "transport_multilink")] max_links: usize,
#[cfg(feature = "shared-memory")] with_shm: bool,
Expand Down
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use async_std::task::JoinHandle;
use std::{sync::Arc, time::Duration};
use zenoh_buffers::ZSlice;
use zenoh_protocol::transport::{BatchSize, KeepAlive, TransportMessage};
use zenoh_result::{bail, zerror, ZResult};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::{RecyclingObjectPool, Signal};

#[derive(Clone)]
Expand Down Expand Up @@ -270,8 +270,6 @@ async fn rx_task(
let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice());

while !signal.is_triggered() {
// Retrieve one buffer
let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc());
// Retrieve one buffer
let batch = RBatch::new(
rx_batch_size,
Expand Down
24 changes: 15 additions & 9 deletions io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
use crate::stats::TransportStats;
use crate::{
common::priority::{TransportPriorityRx, TransportPriorityTx},
transport_unicast_inner::TransportUnicastTrait,
unicast::link::{TransportLinkUnicast, TransportLinkUnicastDirection},
universal::link::TransportLinkUnicastUniversal,
TransportConfigUnicast, TransportExecutor, TransportManager, TransportPeerEventHandler,
unicast::{
link::{TransportLinkUnicast, TransportLinkUnicastDirection},
transport_unicast_inner::TransportUnicastTrait,
universal::link::TransportLinkUnicastUniversal,
TransportConfigUnicast,
},
TransportExecutor, TransportManager, TransportPeerEventHandler,
};
use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
use async_trait::async_trait;
Expand All @@ -37,19 +40,22 @@ use zenoh_result::{bail, zerror, ZResult};

macro_rules! zlinkget {
($guard:expr, $link:expr) => {
$guard.iter().find(|tl| &tl.link == $link)
// Compare LinkUnicast link to not compare TransportLinkUnicast direction
$guard.iter().find(|tl| &tl.link.link == &$link.link)
};
}

macro_rules! zlinkgetmut {
($guard:expr, $link:expr) => {
$guard.iter_mut().find(|tl| &tl.link == $link)
// Compare LinkUnicast link to not compare TransportLinkUnicast direction
$guard.iter_mut().find(|tl| &tl.link.link == &$link.link)
};
}

macro_rules! zlinkindex {
($guard:expr, $link:expr) => {
$guard.iter().position(|tl| &tl.link == $link)
// Compare LinkUnicast link to not compare TransportLinkUnicast direction
$guard.iter().position(|tl| &tl.link.link == &$link.link)
};
}

Expand Down Expand Up @@ -438,9 +444,9 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
}
None => {
bail!(
"Can not start Link TX {} with peer: {}",
"Can not start Link TX {} with ZID: {}",
link,
self.config.zid
self.config.zid,
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/tests/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use zenoh_protocol::{
};
use zenoh_result::ZResult;
use zenoh_transport::{
TransportEventHandler, TransportManager, TransportMulticast, TransportMulticastEventHandler,
TransportPeer, TransportPeerEventHandler, TransportUnicast,
multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler,
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
};

const TIMEOUT: Duration = Duration::from_secs(60);
Expand Down
Loading

0 comments on commit faa781c

Please sign in to comment.