From 68aadaf77302f66364bc5a718e2007c3187059ab Mon Sep 17 00:00:00 2001 From: Dmitrii Bannov <104833606+yellowhatter@users.noreply.github.com> Date: Thu, 14 Dec 2023 16:59:57 +0300 Subject: [PATCH] Refactor transport trait (#610) --- Cargo.lock | 2 +- commons/zenoh-shm/src/lib.rs | 9 +- io/zenoh-link-commons/src/lib.rs | 12 + .../zenoh-link-unixpipe/src/unix/unicast.rs | 13 +- io/zenoh-transport/src/common/seq_num.rs | 31 ++ .../src/unicast/establishment/accept.rs | 45 +-- .../src/unicast/establishment/mod.rs | 58 +-- .../src/unicast/establishment/open.rs | 37 +- io/zenoh-transport/src/unicast/link.rs | 107 ++++- .../src/unicast/lowlatency/link.rs | 23 +- .../src/unicast/lowlatency/transport.rs | 185 +++------ io/zenoh-transport/src/unicast/manager.rs | 368 ++++++++++++------ io/zenoh-transport/src/unicast/mod.rs | 24 +- .../src/unicast/transport_unicast_inner.rs | 44 ++- .../src/unicast/universal/link.rs | 151 +++---- .../src/unicast/universal/rx.rs | 72 ++-- .../src/unicast/universal/transport.rs | 261 ++++++------- .../src/unicast/universal/tx.rs | 4 +- io/zenoh-transport/tests/unicast_shm.rs | 24 +- io/zenoh-transport/tests/unicast_transport.rs | 1 + zenoh/tests/routing.rs | 2 +- 21 files changed, 800 insertions(+), 673 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2fcc8b187..7ff6cbd6ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2913,7 +2913,7 @@ dependencies = [ "libc", "spin 0.9.8", "untrusted 0.9.0", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index 62e90ba208..61a7ea9be3 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -308,11 +308,10 @@ impl SharedMemoryManager { { Ok(m) => m, Err(ShmemError::LinkExists) => { - log::trace!("SharedMemory already exists, opening it"); - ShmemConf::new() - .flink(path.clone()) - .open() - .map_err(|e| ShmError(zerror!("Unable to open SharedMemoryManager: {}", e)))? + return Err(ShmError(zerror!( + "Unable to open SharedMemoryManager: SharedMemory already exists" + )) + .into()) } Err(e) => { return Err(ShmError(zerror!("Unable to open SharedMemoryManager: {}", e)).into()) diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 114990726a..790f4792a4 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -99,3 +99,15 @@ impl From for Link { Link::from(&link) } } + +impl PartialEq for Link { + fn eq(&self, other: &LinkUnicast) -> bool { + self.src == *other.get_src() && self.dst == *other.get_dst() + } +} + +impl PartialEq for Link { + fn eq(&self, other: &LinkMulticast) -> bool { + self.src == *other.get_src() && self.dst == *other.get_dst() + } +} diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 72d7859326..156698d195 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -19,18 +19,20 @@ use async_std::fs::remove_file; use async_std::task::JoinHandle; use async_trait::async_trait; use filepath::FilePath; +use nix::libc; use nix::unistd::unlink; use rand::Rng; use std::cell::UnsafeCell; use std::collections::HashMap; use std::fmt; -use std::fs::File; +use std::fs::{File, OpenOptions}; use std::io::{Read, Write}; +use std::os::unix::fs::OpenOptionsExt; use std::sync::Arc; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_protocol::core::{EndPoint, Locator}; -use unix_named_pipe::{create, open_read, open_write}; +use unix_named_pipe::{create, open_write}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, @@ -161,7 +163,12 @@ impl PipeR { } fn open_unique_pipe_for_read(path: &str) -> ZResult { - let read = open_read(path)?; + let read = OpenOptions::new() + .read(true) + .write(true) + .custom_flags(libc::O_NONBLOCK) + .open(path)?; + #[cfg(not(target_os = "macos"))] read.try_lock(FileLockMode::Exclusive)?; Ok(read) diff --git a/io/zenoh-transport/src/common/seq_num.rs b/io/zenoh-transport/src/common/seq_num.rs index 159fd56712..f286d14741 100644 --- a/io/zenoh-transport/src/common/seq_num.rs +++ b/io/zenoh-transport/src/common/seq_num.rs @@ -121,6 +121,37 @@ impl SeqNum { Ok((gap != 0) && ((gap & !(self.mask >> 1)) == 0)) } + /// Checks to see if two sequence number are in a precedence relationship, + /// while taking into account roll backs AND do update the sn value if check succeed. + /// + /// Two case are considered: + /// + /// ## Case 1: sna < snb + /// + /// In this case *sna* precedes *snb* iff (snb - sna) <= semi_int where + /// semi_int is defined as half the sequence number resolution. + /// In other terms, sna precedes snb iff there are less than half + /// the length for the interval that separates them. + /// + /// ## Case 2: sna > snb + /// + /// In this case *sna* precedes *snb* iff (sna - snb) > semi_int. + /// + /// # Arguments + /// + /// * `value` - The sequence number which should be checked for precedence relation. + pub(crate) fn roll(&mut self, value: TransportSn) -> ZResult { + if (value & !self.mask) != 0 { + bail!("The sequence number value must be smaller than the resolution"); + } + let gap = value.wrapping_sub(self.value) & self.mask; + if (gap != 0) && ((gap & !(self.mask >> 1)) == 0) { + self.value = value; + return Ok(true); + } + Ok(false) + } + /// Computes the modulo gap between two sequence numbers. #[cfg(test)] // @TODO: remove #[cfg(test)] once reliability is implemented pub(crate) fn gap(&self, value: TransportSn) -> ZResult { diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index a3e5651bdb..72e676f6ec 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -16,10 +16,11 @@ use crate::unicast::shared_memory_unicast::Challenge; use crate::{ common::batch::BatchConfig, unicast::{ - establishment::{ - compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, Zenoh080Cookie, + establishment::{compute_sn, ext, AcceptFsm, Cookie, Zenoh080Cookie}, + link::{ + LinkUnicastWithOpenAck, TransportLinkUnicast, TransportLinkUnicastConfig, + TransportLinkUnicastDirection, }, - link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection}, TransportConfigUnicast, }, TransportManager, @@ -585,7 +586,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } } -pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> { +pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> { let mtu = link.get_mtu(); let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { @@ -597,7 +598,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) is_compression: false, }, }; - let mut link = TransportLinkUnicast::new(link.clone(), config); + let mut link = TransportLinkUnicast::new(link, config); let mut fsm = AcceptLink { link: &mut link, prng: &manager.prng, @@ -718,31 +719,17 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) is_compression: state.link.ext_compression.is_compression(), }, }; - let a_link = TransportLinkUnicast::new(link.link.clone(), a_config); + let a_link = link.reconfigure(a_config); let s_link = format!("{:?}", a_link); - let transport = step!(manager.init_transport_unicast(config, a_link).await); - - // Send the open_ack on the link - step!(link - .send(&oack_out.open_ack.into()) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))); - - // Sync the RX sequence number - let _ = step!(transport - .get_inner() - .map_err(|e| (e, Some(close::reason::INVALID)))) - .sync(osyn_out.other_initial_sn) - .await; - - // Finalize the transport - let input = InputFinalize { - transport: transport.clone(), - other_lease: osyn_out.other_lease, - }; - step!(finalize_transport(&link, manager, input) - .await - .map_err(|e| (e, Some(close::reason::INVALID)))); + let a_link = LinkUnicastWithOpenAck::new(a_link, Some(oack_out.open_ack)); + let _transport = manager + .init_transport_unicast( + config, + a_link, + osyn_out.other_initial_sn, + osyn_out.other_lease, + ) + .await?; log::debug!( "New transport link accepted from {} to {}: {}.", diff --git a/io/zenoh-transport/src/unicast/establishment/mod.rs b/io/zenoh-transport/src/unicast/establishment/mod.rs index 523e6e9d22..f79aa826d0 100644 --- a/io/zenoh-transport/src/unicast/establishment/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/mod.rs @@ -16,21 +16,17 @@ pub(super) mod cookie; pub mod ext; pub(crate) mod open; -use super::{TransportPeer, TransportUnicast}; -use crate::{common::seq_num, unicast::link::TransportLinkUnicast, TransportManager}; +use crate::common::seq_num; use async_trait::async_trait; use cookie::*; use sha3::{ digest::{ExtendableOutput, Update, XofReader}, Shake128, }; -use std::time::Duration; -use zenoh_link::Link; use zenoh_protocol::{ core::{Field, Resolution, ZenohId}, transport::TransportSn, }; -use zenoh_result::ZResult; /*************************************/ /* TRAITS */ @@ -115,55 +111,3 @@ pub(super) fn compute_sn(zid1: ZenohId, zid2: ZenohId, resolution: Resolution) - hasher.finalize_xof().read(&mut array); TransportSn::from_le_bytes(array) & seq_num::get_mask(resolution.get(Field::FrameSN)) } - -pub(super) struct InputFinalize { - pub(super) transport: TransportUnicast, - pub(super) other_lease: Duration, -} -// Finalize the transport, notify the callback and start the link tasks -pub(super) async fn finalize_transport( - link: &TransportLinkUnicast, - manager: &TransportManager, - input: self::InputFinalize, -) -> ZResult<()> { - // Retrive the transport's transport - let transport = input.transport.get_inner()?; - - // Start the TX loop - let keep_alive = manager.config.unicast.lease / manager.config.unicast.keep_alive as u32; - transport.start_tx(link, &manager.tx_executor, keep_alive)?; - - // Assign a callback if the transport is new - // Keep the lock to avoid concurrent new_transport and closing/closed notifications - let a_guard = transport.get_alive().await; - if transport.get_callback().is_none() { - let peer = TransportPeer { - zid: transport.get_zid(), - whatami: transport.get_whatami(), - links: vec![Link::from(link)], - is_qos: transport.is_qos(), - #[cfg(feature = "shared-memory")] - is_shm: transport.is_shm(), - }; - // Notify the transport handler that there is a new transport and get back a callback - // NOTE: the read loop of the link the open message was sent on remains blocked - // until new_unicast() returns. The read_loop in the various links - // waits for any eventual transport to associate to. - let callback = manager - .config - .handler - .new_unicast(peer, input.transport.clone())?; - // Set the callback on the transport - transport.set_callback(callback); - } - if let Some(callback) = transport.get_callback() { - // Notify the transport handler there is a new link on this transport - callback.new_link(Link::from(link)); - } - drop(a_guard); - - // Start the RX loop - transport.start_rx(link, input.other_lease)?; - - Ok(()) -} diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 6e10509d69..c3f1bfbb8a 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -16,8 +16,11 @@ use crate::unicast::shared_memory_unicast::Challenge; use crate::{ common::batch::BatchConfig, unicast::{ - establishment::{compute_sn, ext, finalize_transport, InputFinalize, OpenFsm}, - link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection}, + establishment::{compute_sn, ext, OpenFsm}, + link::{ + LinkUnicastWithOpenAck, TransportLinkUnicast, TransportLinkUnicastConfig, + TransportLinkUnicastDirection, + }, TransportConfigUnicast, TransportUnicast, }, TransportManager, @@ -629,27 +632,17 @@ pub(crate) async fn open_link( is_compression: state.link.ext_compression.is_compression(), }, }; - let o_link = TransportLinkUnicast::new(link.link.clone(), o_config); + let o_link = link.reconfigure(o_config); let s_link = format!("{:?}", o_link); - let transport = step!(manager.init_transport_unicast(config, o_link).await); - - // Sync the RX sequence number - let _ = step!(transport - .get_inner() - .map_err(|e| (e, Some(close::reason::INVALID)))) - .sync(oack_out.other_initial_sn) - .await; - - let output = InputFinalize { - transport, - other_lease: oack_out.other_lease, - }; - let transport = output.transport.clone(); - let res = finalize_transport(&link, manager, output).await; - if let Err(e) = res { - let _ = transport.close().await; - return Err(e); - } + let o_link = LinkUnicastWithOpenAck::new(o_link, None); + let transport = manager + .init_transport_unicast( + config, + o_link, + oack_out.other_initial_sn, + oack_out.other_lease, + ) + .await?; log::debug!( "New transport link opened from {} to {}: {}.", diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 5b4da7365b..bd756d6396 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; -use zenoh_protocol::transport::{BatchSize, Close, TransportMessage}; +use zenoh_protocol::transport::{BatchSize, Close, OpenAck, TransportMessage}; use zenoh_result::{zerror, ZResult}; #[derive(Clone, Copy, PartialEq, Eq, Debug)] @@ -40,11 +40,23 @@ pub(crate) struct TransportLinkUnicast { } impl TransportLinkUnicast { - pub(crate) fn new(link: LinkUnicast, mut config: TransportLinkUnicastConfig) -> Self { + pub(crate) fn new(link: LinkUnicast, config: TransportLinkUnicastConfig) -> Self { + Self::init(link, config) + } + + pub(crate) fn reconfigure(self, new_config: TransportLinkUnicastConfig) -> Self { + Self::init(self.link, new_config) + } + + fn init(link: LinkUnicast, mut config: TransportLinkUnicastConfig) -> Self { config.batch.mtu = link.get_mtu().min(config.batch.mtu); Self { link, config } } + pub(crate) fn link(&self) -> Link { + (&self.link).into() + } + pub(crate) fn tx(&self) -> TransportLinkUnicastTx { TransportLinkUnicastTx { inner: self.clone(), @@ -63,7 +75,8 @@ impl TransportLinkUnicast { pub(crate) fn rx(&self) -> TransportLinkUnicastRx { TransportLinkUnicastRx { - inner: self.clone(), + link: self.link.clone(), + batch: self.config.batch, } } @@ -115,7 +128,13 @@ impl From<&TransportLinkUnicast> for Link { impl From for Link { fn from(link: TransportLinkUnicast) -> Self { - Link::from(link.link) + Link::from(&link.link) + } +} + +impl PartialEq for TransportLinkUnicast { + fn eq(&self, other: &Link) -> bool { + &other.src == self.link.get_src() && &other.dst == self.link.get_dst() } } @@ -180,7 +199,8 @@ impl fmt::Debug for TransportLinkUnicastTx { } pub(crate) struct TransportLinkUnicastRx { - pub(crate) inner: TransportLinkUnicast, + pub(crate) link: LinkUnicast, + pub(crate) batch: BatchConfig, } impl TransportLinkUnicastRx { @@ -192,10 +212,10 @@ impl TransportLinkUnicastRx { const ERR: &str = "Read error from link: "; let mut into = (buff)(); - let end = if self.inner.link.is_streamed() { + let end = if self.link.is_streamed() { // Read and decode the message length let mut len = BatchSize::MIN.to_le_bytes(); - self.inner.link.read_exact(&mut len).await?; + self.link.read_exact(&mut len).await?; let l = BatchSize::from_le_bytes(len) as usize; // Read the bytes @@ -203,18 +223,18 @@ impl TransportLinkUnicastRx { .as_mut_slice() .get_mut(len.len()..len.len() + l) .ok_or_else(|| zerror!("{ERR}{self}. Invalid batch length or buffer size."))?; - self.inner.link.read_exact(slice).await?; + self.link.read_exact(slice).await?; len.len() + l } else { // Read the bytes - self.inner.link.read(into.as_mut_slice()).await? + self.link.read(into.as_mut_slice()).await? }; // log::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]); let buffer = ZSlice::make(Arc::new(into), 0, end) .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; - let mut batch = RBatch::new(self.inner.config.batch, buffer); + let mut batch = RBatch::new(self.batch, buffer); batch .initialize(buff) .map_err(|e| zerror!("{ERR}{self}. {e}."))?; @@ -225,7 +245,7 @@ impl TransportLinkUnicastRx { } pub async fn recv(&mut self) -> ZResult { - let mtu = self.inner.config.batch.mtu as usize; + let mtu = self.batch.mtu as usize; let mut batch = self .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice()) .await?; @@ -238,15 +258,74 @@ impl TransportLinkUnicastRx { impl fmt::Display for TransportLinkUnicastRx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.inner) + write!(f, "{}:{:?}", self.link, self.batch) } } impl fmt::Debug for TransportLinkUnicastRx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("TransportLinkUnicastRx") - .field("link", &self.inner.link) - .field("config", &self.inner.config) + .field("link", &self.link) + .field("config", &self.batch) .finish() } } + +pub(crate) struct MaybeOpenAck { + link: TransportLinkUnicastTx, + open_ack: Option, +} + +impl MaybeOpenAck { + pub(crate) fn new(link: &TransportLinkUnicast, open_ack: Option) -> Self { + Self { + link: link.tx(), + open_ack, + } + } + + pub(crate) async fn send_open_ack(mut self) -> ZResult<()> { + if let Some(msg) = self.open_ack { + return self.link.send(&msg.into()).await.map(|_| {}); + } + Ok(()) + } + + pub(crate) fn link(&self) -> Link { + self.link.inner.link() + } +} + +#[derive(PartialEq, Eq)] +pub(crate) struct LinkUnicastWithOpenAck { + link: TransportLinkUnicast, + ack: Option, +} + +impl LinkUnicastWithOpenAck { + pub(crate) fn new(link: TransportLinkUnicast, ack: Option) -> Self { + Self { link, ack } + } + + pub(crate) fn inner_config(&self) -> &TransportLinkUnicastConfig { + &self.link.config + } + + pub(crate) fn unpack(self) -> (TransportLinkUnicast, MaybeOpenAck) { + let ack = MaybeOpenAck::new(&self.link, self.ack); + (self.link, ack) + } + + pub(crate) fn fail(self) -> TransportLinkUnicast { + self.link + } +} + +impl fmt::Display for LinkUnicastWithOpenAck { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.ack.as_ref() { + Some(ack) => write!(f, "{}({:?})", self.link, ack), + None => write!(f, "{}", self.link), + } + } +} diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 4cfbbee115..6a382f5960 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -14,6 +14,7 @@ use super::transport::TransportUnicastLowlatency; #[cfg(feature = "stats")] use crate::stats::TransportStats; +use crate::unicast::link::TransportLinkUnicastRx; use crate::{unicast::link::TransportLinkUnicast, TransportExecutor}; use async_std::task; use async_std::{prelude::FutureExt, sync::RwLock}; @@ -77,8 +78,9 @@ impl TransportUnicastLowlatency { pub(super) async fn send_async(&self, msg: TransportMessageLowLatency) -> ZResult<()> { let guard = zasyncwrite!(self.link); + let link = guard.as_ref().ok_or_else(|| zerror!("No link"))?; send_with_link( - &guard, + link, msg, #[cfg(feature = "stats")] &self.stats, @@ -132,7 +134,7 @@ impl TransportUnicastLowlatency { let c_transport = self.clone(); let handle = task::spawn(async move { let guard = zasyncread!(c_transport.link); - let link = guard.clone(); + let link = guard.as_ref().unwrap().rx(); drop(guard); let rx_buffer_size = c_transport.manager.config.link_rx_buffer_size; @@ -173,7 +175,7 @@ impl TransportUnicastLowlatency { /* TASKS */ /*************************************/ async fn keepalive_task( - link: Arc>, + link: Arc>>, keep_alive: Duration, #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { @@ -185,8 +187,9 @@ async fn keepalive_task( }; let guard = zasyncwrite!(link); + let link = guard.as_ref().ok_or_else(|| zerror!("No link"))?; let _ = send_with_link( - &guard, + link, keepailve, #[cfg(feature = "stats")] &stats, @@ -197,12 +200,12 @@ async fn keepalive_task( } async fn rx_task_stream( - link: TransportLinkUnicast, + link: TransportLinkUnicastRx, transport: TransportUnicastLowlatency, lease: Duration, rx_buffer_size: usize, ) -> ZResult<()> { - async fn read(link: &TransportLinkUnicast, buffer: &mut [u8]) -> ZResult { + async fn read(link: &TransportLinkUnicastRx, buffer: &mut [u8]) -> ZResult { // 16 bits for reading the batch length let mut length = [0_u8, 0_u8, 0_u8, 0_u8]; link.link.read_exact(&mut length).await?; @@ -216,7 +219,7 @@ async fn rx_task_stream( } // The pool of buffers - let mtu = link.config.batch.mtu as usize; + let mtu = link.batch.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -242,13 +245,13 @@ async fn rx_task_stream( } async fn rx_task_dgram( - link: TransportLinkUnicast, + link: TransportLinkUnicastRx, transport: TransportUnicastLowlatency, lease: Duration, rx_buffer_size: usize, ) -> ZResult<()> { // The pool of buffers - let mtu = link.config.batch.max_buffer_size(); + let mtu = link.batch.max_buffer_size(); let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -277,7 +280,7 @@ async fn rx_task_dgram( } async fn rx_task( - link: TransportLinkUnicast, + link: TransportLinkUnicastRx, transport: TransportUnicastLowlatency, lease: Duration, rx_buffer_size: usize, diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index d2d64a0310..afc7d3c849 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -11,31 +11,23 @@ // Contributors: // ZettaScale Zenoh Team, // -#[cfg(feature = "transport_unixpipe")] -use super::link::send_with_link; #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::{ unicast::{ - link::TransportLinkUnicast, transport_unicast_inner::TransportUnicastTrait, + link::{LinkUnicastWithOpenAck, TransportLinkUnicast}, + transport_unicast_inner::{AddLinkResult, TransportUnicastTrait}, TransportConfigUnicast, }, - TransportExecutor, TransportManager, TransportPeerEventHandler, + TransportManager, TransportPeerEventHandler, }; use async_executor::Task; -#[cfg(feature = "transport_unixpipe")] -use async_std::sync::RwLockUpgradableReadGuard; use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock}; use async_std::task::JoinHandle; use async_trait::async_trait; use std::sync::{Arc, RwLock as SyncRwLock}; use std::time::Duration; -#[cfg(feature = "transport_unixpipe")] -use zenoh_core::zasyncread_upgradable; use zenoh_core::{zasynclock, zasyncread, zasyncwrite, zread, zwrite}; -#[cfg(feature = "transport_unixpipe")] -use zenoh_link::unixpipe::UNIXPIPE_LOCATOR_PREFIX; -#[cfg(feature = "transport_unixpipe")] use zenoh_link::Link; use zenoh_protocol::network::NetworkMessage; use zenoh_protocol::transport::TransportBodyLowLatency; @@ -45,8 +37,6 @@ use zenoh_protocol::{ core::{WhatAmI, ZenohId}, transport::close, }; -#[cfg(not(feature = "transport_unixpipe"))] -use zenoh_result::bail; use zenoh_result::{zerror, ZResult}; /*************************************/ @@ -59,7 +49,7 @@ pub(crate) struct TransportUnicastLowlatency { // Transport config pub(super) config: TransportConfigUnicast, // The link associated to the transport - pub(super) link: Arc>, + pub(super) link: Arc>>, // The callback pub(super) callback: Arc>>>, // Mutex for notification @@ -68,7 +58,7 @@ pub(crate) struct TransportUnicastLowlatency { #[cfg(feature = "stats")] pub(super) stats: Arc, - // The flags to stop TX/RX tasks + // The handles for TX/RX tasks pub(crate) handle_keepalive: Arc>>>, pub(crate) handle_rx: Arc>>>, } @@ -77,23 +67,20 @@ impl TransportUnicastLowlatency { pub fn make( manager: TransportManager, config: TransportConfigUnicast, - link: TransportLinkUnicast, - ) -> ZResult { + ) -> Arc { #[cfg(feature = "stats")] let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); - let t = TransportUnicastLowlatency { + Arc::new(TransportUnicastLowlatency { manager, config, - link: Arc::new(RwLock::new(link)), + link: Arc::new(RwLock::new(None)), callback: Arc::new(SyncRwLock::new(None)), alive: Arc::new(AsyncMutex::new(false)), #[cfg(feature = "stats")] stats, handle_keepalive: Arc::new(RwLock::new(None)), handle_rx: Arc::new(RwLock::new(None)), - }; - - Ok(t) + }) as Arc } /*************************************/ @@ -142,9 +129,9 @@ impl TransportUnicastLowlatency { // Close and drop the link self.stop_keepalive().await; self.stop_rx().await; - let _ = zasyncwrite!(self.link) - .close(Some(close::reason::GENERIC)) - .await; + if let Some(val) = zasyncwrite!(self.link).as_ref() { + let _ = val.close(Some(close::reason::GENERIC)).await; + } // Notify the callback that we have closed the transport if let Some(cb) = callback.as_ref() { @@ -153,6 +140,20 @@ impl TransportUnicastLowlatency { Ok(()) } + + async fn sync(&self, _initial_sn_rx: TransportSn) -> ZResult<()> { + // Mark the transport as alive + let mut a_guard = zasynclock!(self.alive); + if *a_guard { + let e = zerror!("Transport already synched with peer: {}", self.config.zid); + log::trace!("{}", e); + return Err(e.into()); + } + + *a_guard = true; + + Ok(()) + } } #[async_trait] @@ -161,17 +162,19 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { /* ACCESSORS */ /*************************************/ fn set_callback(&self, callback: Arc) { - let mut guard = zwrite!(self.callback); - *guard = Some(callback); + *zwrite!(self.callback) = Some(callback); } async fn get_alive(&self) -> AsyncMutexGuard<'_, bool> { zasynclock!(self.alive) } - fn get_links(&self) -> Vec { + fn get_links(&self) -> Vec { let guard = async_std::task::block_on(async { zasyncread!(self.link) }); - [guard.clone()].to_vec() + if let Some(val) = guard.as_ref() { + return [val.link()].to_vec(); + } + vec![] } fn get_zid(&self) -> ZenohId { @@ -211,111 +214,49 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { self.internal_schedule(msg) } - fn start_tx( - &self, - _link: &TransportLinkUnicast, - executor: &TransportExecutor, - keep_alive: Duration, - ) -> ZResult<()> { - self.start_keepalive(executor, keep_alive); - Ok(()) - } - - fn start_rx(&self, _link: &TransportLinkUnicast, lease: Duration) -> ZResult<()> { - self.internal_start_rx(lease); - Ok(()) - } - /*************************************/ /* LINK */ /*************************************/ - async fn add_link(&self, link: TransportLinkUnicast) -> ZResult<()> { + async fn add_link( + &self, + link: LinkUnicastWithOpenAck, + other_initial_sn: TransportSn, + other_lease: Duration, + ) -> AddLinkResult { log::trace!("Adding link: {}", link); - #[cfg(not(feature = "transport_unixpipe"))] - bail!( - "Can not add Link {} with peer {}: link already exists and only unique link is supported!", - link, - self.config.zid, - ); + let _ = self.sync(other_initial_sn).await; - #[cfg(feature = "transport_unixpipe")] - { - let guard = zasyncread_upgradable!(self.link); - - let existing_unixpipe = - guard.link.get_dst().protocol().as_str() == UNIXPIPE_LOCATOR_PREFIX; - let new_unixpipe = link.link.get_dst().protocol().as_str() == UNIXPIPE_LOCATOR_PREFIX; - match (existing_unixpipe, new_unixpipe) { - (false, true) => { - // LowLatency transport suports only a single link, but code here also handles upgrade from non-unixpipe link to unixpipe link! - log::trace!( - "Upgrading {} LowLatency transport's link from {} to {}", - self.config.zid, - guard, - link - ); - - // Prepare and send close message on old link - { - let close = TransportMessageLowLatency { - body: TransportBodyLowLatency::Close(Close { - reason: 0, - session: false, - }), - }; - let _ = send_with_link( - &guard, - close, - #[cfg(feature = "stats")] - &self.stats, - ) - .await; - }; - // Notify the callback - if let Some(callback) = zread!(self.callback).as_ref() { - callback.del_link(Link::from(guard.clone())); - } - - // Set the new link - let mut write_guard = RwLockUpgradableReadGuard::upgrade(guard).await; - *write_guard = link; - - Ok(()) - } - _ => { - let e = zerror!( - "Can not add Link {} with peer {}: link already exists and only unique link is supported!", - link, - self.config.zid, - ); - Err(e.into()) - } - } + let mut guard = zasyncwrite!(self.link); + if guard.is_some() { + return Err(( + zerror!("Lowlatency transport cannot support more than one link!").into(), + link.fail(), + close::reason::GENERIC, + )); } - } - - /*************************************/ - /* INITIATION */ - /*************************************/ - async fn sync(&self, _initial_sn_rx: TransportSn) -> ZResult<()> { - // Mark the transport as alive - let mut a_guard = zasynclock!(self.alive); - if *a_guard { - let e = zerror!("Transport already synched with peer: {}", self.config.zid); - log::trace!("{}", e); - return Err(e.into()); - } - - *a_guard = true; - - Ok(()) + let (link, ack) = link.unpack(); + *guard = Some(link); + drop(guard); + + // create a callback to start the link + let start_link = Box::new(move || { + // start keepalive task + let keep_alive = + self.manager.config.unicast.lease / self.manager.config.unicast.keep_alive as u32; + self.start_keepalive(&self.manager.tx_executor, keep_alive); + + // start RX task + self.internal_start_rx(other_lease); + }); + + return Ok((start_link, ack)); } /*************************************/ /* TERMINATION */ /*************************************/ - async fn close_link(&self, link: &TransportLinkUnicast, reason: u8) -> ZResult<()> { + async fn close_link(&self, link: Link, reason: u8) -> ZResult<()> { log::trace!("Closing link {} with peer: {}", link, self.config.zid); self.finalize(reason).await } diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index da064e8f5b..2328e78a76 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -13,19 +13,25 @@ // #[cfg(feature = "shared-memory")] use super::shared_memory_unicast::SharedMemoryUnicast; +use super::{link::LinkUnicastWithOpenAck, transport_unicast_inner::InitTransportResult}; #[cfg(feature = "transport_auth")] use crate::unicast::establishment::ext::auth::Auth; #[cfg(feature = "transport_multilink")] use crate::unicast::establishment::ext::multilink::MultiLink; use crate::{ unicast::{ - link::TransportLinkUnicast, lowlatency::transport::TransportUnicastLowlatency, - transport_unicast_inner::TransportUnicastTrait, - universal::transport::TransportUnicastUniversal, TransportConfigUnicast, TransportUnicast, + lowlatency::transport::TransportUnicastLowlatency, + transport_unicast_inner::{InitTransportError, TransportUnicastTrait}, + universal::transport::TransportUnicastUniversal, + TransportConfigUnicast, TransportUnicast, }, - TransportManager, + TransportManager, TransportPeer, +}; +use async_std::{ + prelude::FutureExt, + sync::{Mutex, MutexGuard}, + task, }; -use async_std::{prelude::FutureExt, sync::Mutex, task}; use std::{collections::HashMap, sync::Arc, time::Duration}; #[cfg(feature = "transport_compression")] use zenoh_config::CompressionUnicastConf; @@ -37,9 +43,9 @@ use zenoh_crypto::PseudoRng; use zenoh_link::*; use zenoh_protocol::{ core::{endpoint, ZenohId}, - transport::close, + transport::{close, TransportSn}, }; -use zenoh_result::{bail, zerror, Error, ZResult}; +use zenoh_result::{bail, zerror, ZResult}; /*************************************/ /* TRANSPORT CONFIG */ @@ -408,111 +414,254 @@ impl TransportManager { /*************************************/ /* TRANSPORT */ /*************************************/ - pub(super) async fn init_transport_unicast( + async fn init_existing_transport_unicast( &self, config: TransportConfigUnicast, - link: TransportLinkUnicast, - ) -> Result)> { - let mut guard = zasynclock!(self.state.unicast.transports); + link: LinkUnicastWithOpenAck, + other_initial_sn: TransportSn, + other_lease: Duration, + transport: Arc, + ) -> InitTransportResult { + let existing_config = transport.get_config(); + // Verify that fundamental parameters are correct. + // Ignore the non fundamental parameters like initial SN. + if *existing_config != config { + let e = zerror!( + "Transport with peer {} already exist. Invalid config: {:?}. Expected: {:?}.", + config.zid, + config, + existing_config + ); + log::trace!("{}", e); + return Err(InitTransportError::Link(( + e.into(), + link.fail(), + close::reason::INVALID, + ))); + } - // First verify if the transport already exists - match guard.get(&config.zid) { - Some(transport) => { - let existing_config = transport.get_config(); - // If it exists, verify that fundamental parameters like are correct. - // Ignore the non fundamental parameters like initial SN. - if *existing_config != config { - let e = zerror!( - "Transport with peer {} already exist. Invalid config: {:?}. Expected: {:?}.", - config.zid, - config, - existing_config - ); - log::trace!("{}", e); - return Err((e.into(), Some(close::reason::INVALID))); + // Add the link to the transport + let (start_tx_rx, ack) = transport + .add_link(link, other_initial_sn, other_lease) + .await + .map_err(InitTransportError::Link)?; + + // complete establish procedure + let c_link = ack.link(); + let c_t = transport.clone(); + ack.send_open_ack() + .await + .map_err(|e| InitTransportError::Transport((e, c_t, close::reason::GENERIC)))?; + + // notify transport's callback interface that there is a new link + Self::notify_new_link_unicast(&transport, c_link); + + start_tx_rx(); + + Ok(transport) + } + + fn notify_new_link_unicast(transport: &Arc, link: Link) { + if let Some(callback) = &transport.get_callback() { + callback.new_link(link); + } + } + + fn notify_new_transport_unicast( + &self, + transport: &Arc, + ) -> ZResult<()> { + // Assign a callback to the new transport + let peer = TransportPeer { + zid: transport.get_zid(), + whatami: transport.get_whatami(), + links: transport.get_links(), + is_qos: transport.get_config().is_qos, + #[cfg(feature = "shared-memory")] + is_shm: transport.is_shm(), + }; + // Notify the transport handler that there is a new transport and get back a callback + // NOTE: the read loop of the link the open message was sent on remains blocked + // until new_unicast() returns. The read_loop in the various links + // waits for any eventual transport to associate to. + let callback = self + .config + .handler + .new_unicast(peer, TransportUnicast(Arc::downgrade(transport)))?; + + // Set the callback on the transport + transport.set_callback(callback); + + Ok(()) + } + + pub(super) async fn init_new_transport_unicast( + &self, + config: TransportConfigUnicast, + link: LinkUnicastWithOpenAck, + other_initial_sn: TransportSn, + other_lease: Duration, + mut guard: MutexGuard<'_, HashMap>>, + ) -> InitTransportResult { + macro_rules! link_error { + ($s:expr, $reason:expr) => { + match $s { + Ok(output) => output, + Err(e) => { + return Err(InitTransportError::Link((e, link.fail(), $reason))); + } } + }; + } - // Add the link to the transport - transport - .add_link(link) - .await - .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; + // Verify that we haven't reached the transport number limit + if guard.len() >= self.config.unicast.max_sessions { + let e = zerror!( + "Max transports reached ({}). Denying new transport with peer: {}", + self.config.unicast.max_sessions, + config.zid + ); + log::trace!("{e}"); + return Err(InitTransportError::Link(( + e.into(), + link.fail(), + close::reason::INVALID, + ))); + } + + // Create the transport + let is_multilink = zcondfeat!("transport_multilink", config.multilink.is_some(), false); + + // Select and create transport implementation depending on the cfg and enabled features + let t = if config.is_lowlatency { + log::debug!("Will use LowLatency transport!"); + TransportUnicastLowlatency::make(self.clone(), config.clone()) + } else { + log::debug!("Will use Universal transport!"); + link_error!( + TransportUnicastUniversal::make(self.clone(), config.clone()), + close::reason::INVALID + ) + }; - Ok(TransportUnicast(Arc::downgrade(transport))) + // Add the link to the transport + let (start_tx_rx, ack) = match t.add_link(link, other_initial_sn, other_lease).await { + Ok(val) => val, + Err(e) => { + let _ = t.close(e.2).await; + return Err(InitTransportError::Link(e)); } - None => { - // Then verify that we haven't reached the transport number limit - if guard.len() >= self.config.unicast.max_sessions { - let e = zerror!( - "Max transports reached ({}). Denying new transport with peer: {}", - self.config.unicast.max_sessions, - config.zid - ); - log::trace!("{}", e); - return Err((e.into(), Some(close::reason::INVALID))); - } + }; - // Create the transport - let is_multilink = - zcondfeat!("transport_multilink", config.multilink.is_some(), false); - - // select and create transport implementation depending on the cfg and enabled features - let a_t = { - if config.is_lowlatency { - log::debug!("Will use LowLatency transport!"); - TransportUnicastLowlatency::make(self.clone(), config.clone(), link) - .map_err(|e| (e, Some(close::reason::INVALID))) - .map(|v| Arc::new(v) as Arc)? - } else { - log::debug!("Will use Universal transport!"); - let t: Arc = - TransportUnicastUniversal::make(self.clone(), config.clone()) - .map_err(|e| (e, Some(close::reason::INVALID))) - .map(|v| Arc::new(v) as Arc)?; - // Add the link to the transport - t.add_link(link) - .await - .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; - t + macro_rules! transport_error { + ($s:expr, $reason:expr) => { + match $s { + Ok(output) => output, + Err(e) => { + return Err(InitTransportError::Transport((e, t.clone(), $reason))); } - }; - - // Add the transport transport to the list of active transports - let transport = TransportUnicast(Arc::downgrade(&a_t)); - guard.insert(config.zid, a_t); - - zcondfeat!( - "shared-memory", - { - log::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, shm: {}, multilink: {}, lowlatency: {}", - self.config.zid, - config.zid, - config.whatami, - config.sn_resolution, - config.tx_initial_sn, - config.is_qos, - config.is_shm, - is_multilink, - config.is_lowlatency - ); - }, - { - log::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}, lowlatency: {}", - self.config.zid, - config.zid, - config.whatami, - config.sn_resolution, - config.tx_initial_sn, - config.is_qos, - is_multilink, - config.is_lowlatency - ); - } - ); + } + }; + } + + // Complete establish procedure + let c_link = ack.link(); + transport_error!(ack.send_open_ack().await, close::reason::GENERIC); + + // Add the transport transport to the list of active transports + guard.insert(config.zid, t.clone()); + drop(guard); - Ok(transport) + // Notify manager's interface that there is a new transport + transport_error!( + self.notify_new_transport_unicast(&t), + close::reason::GENERIC + ); + + // Notify transport's callback interface that there is a new link + Self::notify_new_link_unicast(&t, c_link); + + start_tx_rx(); + + zcondfeat!( + "shared-memory", + { + log::debug!( + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, shm: {}, multilink: {}, lowlatency: {}", + self.config.zid, + config.zid, + config.whatami, + config.sn_resolution, + config.tx_initial_sn, + config.is_qos, + config.is_shm, + is_multilink, + config.is_lowlatency + ); + }, + { + log::debug!( + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}, lowlatency: {}", + self.config.zid, + config.zid, + config.whatami, + config.sn_resolution, + config.tx_initial_sn, + config.is_qos, + is_multilink, + config.is_lowlatency + ); + } + ); + + Ok(t) + } + + pub(super) async fn init_transport_unicast( + &self, + config: TransportConfigUnicast, + link: LinkUnicastWithOpenAck, + other_initial_sn: TransportSn, + other_lease: Duration, + ) -> ZResult { + // First verify if the transport already exists + let init_result = { + let guard = zasynclock!(self.state.unicast.transports); + match guard.get(&config.zid) { + Some(transport) => { + let transport = transport.clone(); + drop(guard); + self.init_existing_transport_unicast( + config, + link, + other_initial_sn, + other_lease, + transport, + ) + .await + } + None => { + self.init_new_transport_unicast( + config, + link, + other_initial_sn, + other_lease, + guard, + ) + .await + } + } + }; + + match init_result { + Ok(transport) => Ok(TransportUnicast(Arc::downgrade(&transport))), + Err(InitTransportError::Link((e, link, reason))) => { + let _ = link.close(Some(reason)).await; + Err(e) + } + Err(InitTransportError::Transport((e, transport, reason))) => { + let _ = transport.close(reason).await; + Err(e) } } } @@ -552,21 +701,13 @@ impl TransportManager { pub async fn get_transport_unicast(&self, peer: &ZenohId) -> Option { zasynclock!(self.state.unicast.transports) .get(peer) - .map(|t| { - // todo: I cannot find a way to make transport.into() work for TransportUnicastTrait - let weak = Arc::downgrade(t); - TransportUnicast(weak) - }) + .map(|t| TransportUnicast(Arc::downgrade(t))) } pub async fn get_transports_unicast(&self) -> Vec { zasynclock!(self.state.unicast.transports) .values() - .map(|t| { - // todo: I cannot find a way to make transport.into() work for TransportUnicastTrait - let weak = Arc::downgrade(t); - TransportUnicast(weak) - }) + .map(|t| TransportUnicast(Arc::downgrade(t))) .collect() } @@ -602,12 +743,11 @@ impl TransportManager { // Spawn a task to accept the link let c_manager = self.clone(); task::spawn(async move { - if let Err(e) = super::establishment::accept::accept_link(&link, &c_manager) + if let Err(e) = super::establishment::accept::accept_link(link, &c_manager) .timeout(c_manager.config.unicast.accept_timeout) .await { log::debug!("{}", e); - let _ = link.close().await; } let mut guard = zasynclock!(c_manager.state.unicast.incoming); *guard -= 1; diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index 3385cbed6a..55226f287c 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -101,11 +101,7 @@ impl TransportUnicast { let tp = TransportPeer { zid: transport.get_zid(), whatami: transport.get_whatami(), - links: transport - .get_links() - .into_iter() - .map(|l| l.into()) - .collect(), + links: transport.get_links(), is_qos: transport.is_qos(), #[cfg(feature = "shared-memory")] is_shm: transport.is_shm(), @@ -116,11 +112,7 @@ impl TransportUnicast { #[inline(always)] pub fn get_links(&self) -> ZResult> { let transport = self.get_inner()?; - Ok(transport - .get_links() - .into_iter() - .map(|l| l.into()) - .collect()) + Ok(transport.get_links()) } #[inline(always)] @@ -129,18 +121,6 @@ impl TransportUnicast { transport.schedule(message) } - #[inline(always)] - pub async fn close_link(&self, link: &Link) -> ZResult<()> { - let transport = self.get_inner()?; - let link = transport - .get_links() - .into_iter() - .find(|l| l.link.get_src() == &link.src && l.link.get_dst() == &link.dst) - .ok_or_else(|| zerror!("Invalid link"))?; - transport.close_link(&link, close::reason::GENERIC).await?; - Ok(()) - } - #[inline(always)] pub async fn close(&self) -> ZResult<()> { // Return Ok if the transport has already been closed diff --git a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs index 265607705b..92093959dd 100644 --- a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs +++ b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs @@ -14,11 +14,12 @@ use crate::{ unicast::{link::TransportLinkUnicast, TransportConfigUnicast}, - TransportExecutor, TransportPeerEventHandler, + TransportPeerEventHandler, }; use async_std::sync::MutexGuard as AsyncMutexGuard; use async_trait::async_trait; use std::{fmt::DebugStruct, sync::Arc, time::Duration}; +use zenoh_link::Link; use zenoh_protocol::{ core::{WhatAmI, ZenohId}, network::NetworkMessage, @@ -26,6 +27,19 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; +use super::link::{LinkUnicastWithOpenAck, MaybeOpenAck}; + +pub(crate) type LinkError = (zenoh_result::Error, TransportLinkUnicast, u8); +pub(crate) type TransportError = (zenoh_result::Error, Arc, u8); +pub(crate) enum InitTransportError { + Link(LinkError), + Transport(TransportError), +} + +pub(crate) type AddLinkResult<'a> = + Result<(Box, MaybeOpenAck), LinkError>; +pub(crate) type InitTransportResult = Result, InitTransportError>; + /*************************************/ /* UNICAST TRANSPORT TRAIT */ /*************************************/ @@ -35,11 +49,12 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { /* ACCESSORS */ /*************************************/ fn set_callback(&self, callback: Arc); + async fn get_alive(&self) -> AsyncMutexGuard<'_, bool>; fn get_zid(&self) -> ZenohId; fn get_whatami(&self) -> WhatAmI; fn get_callback(&self) -> Option>; - fn get_links(&self) -> Vec; + fn get_links(&self) -> Vec; #[cfg(feature = "shared-memory")] fn is_shm(&self) -> bool; fn is_qos(&self) -> bool; @@ -50,33 +65,22 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { /*************************************/ /* LINK */ /*************************************/ - async fn add_link(&self, link: TransportLinkUnicast) -> ZResult<()>; + async fn add_link( + &self, + link: LinkUnicastWithOpenAck, + other_initial_sn: TransportSn, + other_lease: Duration, + ) -> AddLinkResult; /*************************************/ /* TX */ /*************************************/ fn schedule(&self, msg: NetworkMessage) -> ZResult<()>; - fn start_tx( - &self, - link: &TransportLinkUnicast, - executor: &TransportExecutor, - keep_alive: Duration, - ) -> ZResult<()>; - - /*************************************/ - /* RX */ - /*************************************/ - fn start_rx(&self, link: &TransportLinkUnicast, lease: Duration) -> ZResult<()>; - - /*************************************/ - /* INITIATION */ - /*************************************/ - async fn sync(&self, _initial_sn_rx: TransportSn) -> ZResult<()>; /*************************************/ /* TERMINATION */ /*************************************/ - async fn close_link(&self, link: &TransportLinkUnicast, reason: u8) -> ZResult<()>; + async fn close_link(&self, link: Link, reason: u8) -> ZResult<()>; async fn close(&self, reason: u8) -> ZResult<()>; fn add_debug_fields<'a, 'b: 'a, 'c>( diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index aba680bc43..513cefc0a6 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -29,107 +29,122 @@ use crate::{ use async_std::prelude::FutureExt; use async_std::task; use async_std::task::JoinHandle; -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{Arc, RwLock}, + time::Duration, +}; use zenoh_buffers::ZSliceBuffer; +use zenoh_core::zwrite; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; +pub(super) struct Tasks { + // The handlers to stop TX/RX tasks + handle_tx: RwLock>>, + signal_rx: Signal, + handle_rx: RwLock>>, +} + #[derive(Clone)] pub(super) struct TransportLinkUnicastUniversal { // The underlying link pub(super) link: TransportLinkUnicast, // The transmission pipeline - pub(super) pipeline: Option, - // The transport this link is associated to - transport: TransportUnicastUniversal, - // The signals to stop TX/RX tasks - handle_tx: Option>>, - signal_rx: Signal, - handle_rx: Option>>, + pub(super) pipeline: TransmissionPipelineProducer, + // The task handling substruct + tasks: Arc, } impl TransportLinkUnicastUniversal { - pub(super) fn new(transport: TransportUnicastUniversal, link: TransportLinkUnicast) -> Self { - Self { - link, - pipeline: None, - transport, - handle_tx: None, + pub(super) fn new( + transport: &TransportUnicastUniversal, + link: TransportLinkUnicast, + priority_tx: &[TransportPriorityTx], + ) -> (Self, TransmissionPipelineConsumer) { + assert!(!priority_tx.is_empty()); + + let config = TransmissionPipelineConf { + batch: BatchConfig { + mtu: link.config.batch.mtu, + is_streamed: link.link.is_streamed(), + #[cfg(feature = "transport_compression")] + is_compression: link.config.batch.is_compression, + }, + queue_size: transport.manager.config.queue_size, + backoff: transport.manager.config.queue_backoff, + }; + + // The pipeline + let (producer, consumer) = TransmissionPipeline::make(config, priority_tx); + + let tasks = Arc::new(Tasks { + handle_tx: RwLock::new(None), signal_rx: Signal::new(), - handle_rx: None, - } + handle_rx: RwLock::new(None), + }); + + let result = Self { + link, + pipeline: producer, + tasks, + }; + + (result, consumer) } } impl TransportLinkUnicastUniversal { pub(super) fn start_tx( &mut self, + transport: TransportUnicastUniversal, + consumer: TransmissionPipelineConsumer, executor: &TransportExecutor, keep_alive: Duration, - priority_tx: &[TransportPriorityTx], ) { - if self.handle_tx.is_none() { - let config = TransmissionPipelineConf { - batch: BatchConfig { - mtu: self.link.config.batch.mtu, - is_streamed: self.link.link.is_streamed(), - #[cfg(feature = "transport_compression")] - is_compression: self.link.config.batch.is_compression, - }, - queue_size: self.transport.manager.config.queue_size, - backoff: self.transport.manager.config.queue_backoff, - }; - - // The pipeline - let (producer, consumer) = TransmissionPipeline::make(config, priority_tx); - self.pipeline = Some(producer); - + let mut guard = zwrite!(self.tasks.handle_tx); + if guard.is_none() { // Spawn the TX task - let c_link = self.link.clone(); - let c_transport = self.transport.clone(); + let mut tx = self.link.tx(); let handle = executor.spawn(async move { let res = tx_task( consumer, - c_link.tx(), + &mut tx, keep_alive, #[cfg(feature = "stats")] - c_transport.stats.clone(), + transport.stats.clone(), ) .await; if let Err(e) = res { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - task::spawn(async move { c_transport.del_link(&c_link).await }); + task::spawn(async move { transport.del_link(tx.inner.link()).await }); } }); - self.handle_tx = Some(Arc::new(handle)); + *guard = Some(handle); } } pub(super) fn stop_tx(&mut self) { - if let Some(pl) = self.pipeline.as_ref() { - pl.disable(); - } + self.pipeline.disable(); } - pub(super) fn start_rx(&mut self, lease: Duration) { - if self.handle_rx.is_none() { + pub(super) fn start_rx(&mut self, transport: TransportUnicastUniversal, lease: Duration) { + let mut guard = zwrite!(self.tasks.handle_rx); + if guard.is_none() { // Spawn the RX task - let c_link = self.link.clone(); - let c_transport = self.transport.clone(); - let c_signal = self.signal_rx.clone(); - let c_rx_buffer_size = self.transport.manager.config.link_rx_buffer_size; + let mut rx = self.link.rx(); + let c_signal = self.tasks.signal_rx.clone(); let handle = task::spawn(async move { // Start the consume task let res = rx_task( - c_link.rx(), - c_transport.clone(), + &mut rx, + transport.clone(), lease, c_signal.clone(), - c_rx_buffer_size, + transport.manager.config.link_rx_buffer_size, ) .await; c_signal.trigger(); @@ -137,31 +152,30 @@ impl TransportLinkUnicastUniversal { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - task::spawn(async move { c_transport.del_link(&c_link).await }); + task::spawn(async move { transport.del_link((&rx.link).into()).await }); } }); - self.handle_rx = Some(Arc::new(handle)); + *guard = Some(handle); } } pub(super) fn stop_rx(&mut self) { - self.signal_rx.trigger(); + self.tasks.signal_rx.trigger(); } pub(super) async fn close(mut self) -> ZResult<()> { log::trace!("{}: closing", self.link); + self.stop_tx(); self.stop_rx(); - if let Some(handle) = self.handle_rx.take() { - // SAFETY: it is safe to unwrap the Arc since we have the ownership of the whole link - let handle_rx = Arc::try_unwrap(handle).unwrap(); - handle_rx.await; + + let handle_tx = zwrite!(self.tasks.handle_tx).take(); + if let Some(handle) = handle_tx { + handle.await; } - self.stop_tx(); - if let Some(handle) = self.handle_tx.take() { - // SAFETY: it is safe to unwrap the Arc since we have the ownership of the whole link - let handle_tx = Arc::try_unwrap(handle).unwrap(); - handle_tx.await; + let handle_rx = zwrite!(self.tasks.handle_rx).take(); + if let Some(handle) = handle_rx { + handle.await; } self.link.close(None).await @@ -173,7 +187,7 @@ impl TransportLinkUnicastUniversal { /*************************************/ async fn tx_task( mut pipeline: TransmissionPipelineConsumer, - mut link: TransportLinkUnicastTx, + link: &mut TransportLinkUnicastTx, keep_alive: Duration, #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { @@ -227,7 +241,7 @@ async fn tx_task( } async fn rx_task( - mut link: TransportLinkUnicastRx, + link: &mut TransportLinkUnicastRx, transport: TransportUnicastUniversal, lease: Duration, signal: Signal, @@ -259,16 +273,17 @@ async fn rx_task( } // The pool of buffers - let mtu = link.inner.config.batch.max_buffer_size(); + let mtu = link.batch.max_buffer_size(); let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; } let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); + let l = (&link.link).into(); while !signal.is_triggered() { // Async read from the underlying link - let action = read(&mut link, &pool) + let action = read(link, &pool) .race(stop(signal.clone())) .timeout(lease) .await @@ -279,7 +294,7 @@ async fn rx_task( { transport.stats.inc_rx_bytes(2 + n); // Account for the batch len encoding (16 bits) } - transport.read_messages(batch, &link.inner)?; + transport.read_messages(batch, &l)?; } Action::Stop => break, } diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 459998ddcf..935a1814b0 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -17,11 +17,13 @@ use crate::{ batch::{Decode, RBatch}, priority::TransportChannelRx, }, - unicast::{link::TransportLinkUnicast, transport_unicast_inner::TransportUnicastTrait}, + unicast::transport_unicast_inner::TransportUnicastTrait, + TransportPeerEventHandler, }; use async_std::task; use std::sync::MutexGuard; use zenoh_core::{zlock, zread}; +use zenoh_link::Link; use zenoh_protocol::{ core::{Priority, Reliability}, network::NetworkMessage, @@ -35,35 +37,22 @@ use zenoh_result::{bail, zerror, ZResult}; impl TransportUnicastUniversal { fn trigger_callback( &self, + callback: &dyn TransportPeerEventHandler, #[allow(unused_mut)] // shared-memory feature requires mut mut msg: NetworkMessage, ) -> ZResult<()> { - let callback = zread!(self.callback).clone(); - if let Some(callback) = callback.as_ref() { - #[cfg(feature = "shared-memory")] - { - if self.config.is_shm { - crate::shm::map_zmsg_to_shmbuf( - &mut msg, - &self.manager.state.unicast.shm.reader, - )?; - } + #[cfg(feature = "shared-memory")] + { + if self.config.is_shm { + crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.state.unicast.shm.reader)?; } - callback.handle_message(msg) - } else { - log::debug!( - "Transport: {}. No callback available, dropping message: {}", - self.config.zid, - msg - ); - Ok(()) } + callback.handle_message(msg) } - fn handle_close(&self, link: &TransportLinkUnicast, _reason: u8, session: bool) -> ZResult<()> { + fn handle_close(&self, link: &Link, _reason: u8, session: bool) -> ZResult<()> { // Stop now rx and tx tasks before doing the proper cleanup - let _ = self.stop_rx(link); - let _ = self.stop_tx(link); + let _ = self.stop_rx_tx(link); // Delete and clean up let c_transport = self.clone(); @@ -74,7 +63,7 @@ impl TransportUnicastUniversal { if session { let _ = c_transport.delete().await; } else { - let _ = c_transport.del_link(&c_link).await; + let _ = c_transport.del_link(c_link).await; } }); @@ -109,8 +98,17 @@ impl TransportUnicastUniversal { self.verify_sn(sn, &mut guard)?; - for msg in payload.drain(..) { - self.trigger_callback(msg)?; + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + for msg in payload.drain(..) { + self.trigger_callback(callback.as_ref(), msg)?; + } + } else { + log::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + payload + ); } Ok(()) } @@ -153,7 +151,17 @@ impl TransportUnicastUniversal { .defrag .defragment() .ok_or_else(|| zerror!("Transport: {}. Defragmentation error.", self.config.zid))?; - return self.trigger_callback(msg); + + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + return self.trigger_callback(callback.as_ref(), msg); + } else { + log::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + msg + ); + } } Ok(()) @@ -164,7 +172,7 @@ impl TransportUnicastUniversal { sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, ) -> ZResult<()> { - let precedes = guard.sn.precedes(sn)?; + let precedes = guard.sn.roll(sn)?; if !precedes { log::debug!( "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", @@ -180,18 +188,10 @@ impl TransportUnicastUniversal { return Ok(()); } - // Set will always return OK because we have already checked - // with precedes() that the sn has the right resolution - let _ = guard.sn.set(sn); - Ok(()) } - pub(super) fn read_messages( - &self, - mut batch: RBatch, - link: &TransportLinkUnicast, - ) -> ZResult<()> { + pub(super) fn read_messages(&self, mut batch: RBatch, link: &Link) -> ZResult<()> { while !batch.is_empty() { let msg: TransportMessage = batch .decode() diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index a920ac90b9..942b723365 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -16,12 +16,12 @@ use crate::stats::TransportStats; use crate::{ common::priority::{TransportPriorityRx, TransportPriorityTx}, unicast::{ - link::{TransportLinkUnicast, TransportLinkUnicastDirection}, - transport_unicast_inner::TransportUnicastTrait, + link::{LinkUnicastWithOpenAck, TransportLinkUnicastDirection}, + transport_unicast_inner::{AddLinkResult, TransportUnicastTrait}, universal::link::TransportLinkUnicastUniversal, TransportConfigUnicast, }, - TransportExecutor, TransportManager, TransportPeerEventHandler, + TransportManager, TransportPeerEventHandler, }; use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; use async_trait::async_trait; @@ -33,28 +33,28 @@ use zenoh_link::Link; use zenoh_protocol::{ core::{Priority, WhatAmI, ZenohId}, network::NetworkMessage, - transport::{Close, PrioritySn, TransportMessage, TransportSn}, + transport::{close, Close, PrioritySn, TransportMessage, TransportSn}, }; use zenoh_result::{bail, zerror, ZResult}; macro_rules! zlinkget { ($guard:expr, $link:expr) => { // Compare LinkUnicast link to not compare TransportLinkUnicast direction - $guard.iter().find(|tl| &tl.link.link == &$link.link) + $guard.iter().find(|tl| tl.link == $link) }; } macro_rules! zlinkgetmut { ($guard:expr, $link:expr) => { // Compare LinkUnicast link to not compare TransportLinkUnicast direction - $guard.iter_mut().find(|tl| &tl.link.link == &$link.link) + $guard.iter_mut().find(|tl| tl.link == $link) }; } macro_rules! zlinkindex { ($guard:expr, $link:expr) => { // Compare LinkUnicast link to not compare TransportLinkUnicast direction - $guard.iter().position(|tl| &tl.link.link == &$link.link) + $guard.iter().position(|tl| tl.link == $link) }; } @@ -75,6 +75,8 @@ pub(crate) struct TransportUnicastUniversal { pub(super) links: Arc>>, // The callback pub(super) callback: Arc>>>, + // Lock used to ensure no race in add_link method + add_link_lock: Arc>, // Mutex for notification pub(super) alive: Arc>, // Transport statistics @@ -86,7 +88,7 @@ impl TransportUnicastUniversal { pub fn make( manager: TransportManager, config: TransportConfigUnicast, - ) -> ZResult { + ) -> ZResult> { let mut priority_tx = vec![]; let mut priority_rx = vec![]; @@ -113,17 +115,18 @@ impl TransportUnicastUniversal { #[cfg(feature = "stats")] let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); - let t = TransportUnicastUniversal { + let t = Arc::new(TransportUnicastUniversal { manager, config, priority_tx: priority_tx.into_boxed_slice().into(), priority_rx: priority_rx.into_boxed_slice().into(), links: Arc::new(RwLock::new(vec![].into_boxed_slice())), + add_link_lock: Arc::new(AsyncMutex::new(())), callback: Arc::new(RwLock::new(None)), alive: Arc::new(AsyncMutex::new(false)), #[cfg(feature = "stats")] stats, - }; + }); Ok(t) } @@ -170,7 +173,7 @@ impl TransportUnicastUniversal { Ok(()) } - pub(crate) async fn del_link(&self, link: &TransportLinkUnicast) -> ZResult<()> { + pub(crate) async fn del_link(&self, link: Link) -> ZResult<()> { enum Target { Transport, Link(Box), @@ -205,7 +208,7 @@ impl TransportUnicastUniversal { // Notify the callback if let Some(callback) = zread!(self.callback).as_ref() { - callback.del_link(Link::from(link)); + callback.del_link(link); } match target { @@ -214,16 +217,17 @@ impl TransportUnicastUniversal { } } - pub(crate) fn stop_tx(&self, link: &TransportLinkUnicast) -> ZResult<()> { + pub(crate) fn stop_rx_tx(&self, link: &Link) -> ZResult<()> { let mut guard = zwrite!(self.links); - match zlinkgetmut!(guard, link) { + match zlinkgetmut!(guard, *link) { Some(l) => { + l.stop_rx(); l.stop_tx(); Ok(()) } None => { bail!( - "Can not stop Link TX {} with peer: {}", + "Can not stop Link RX {} with peer: {}", link, self.config.zid ) @@ -231,21 +235,27 @@ impl TransportUnicastUniversal { } } - pub(crate) fn stop_rx(&self, link: &TransportLinkUnicast) -> ZResult<()> { - let mut guard = zwrite!(self.links); - match zlinkgetmut!(guard, link) { - Some(l) => { - l.stop_rx(); - Ok(()) - } - None => { - bail!( - "Can not stop Link RX {} with peer: {}", - link, - self.config.zid - ) - } + async fn sync(&self, initial_sn_rx: TransportSn) -> ZResult<()> { + // Mark the transport as alive and keep the lock + // to avoid concurrent new_transport and closing/closed notifications + let mut a_guard = zasynclock!(self.alive); + if *a_guard { + let e = zerror!("Transport already synched with peer: {}", self.config.zid); + log::trace!("{}", e); + return Err(e.into()); } + + *a_guard = true; + + let csn = PrioritySn { + reliable: initial_sn_rx, + best_effort: initial_sn_rx, + }; + for c in self.priority_rx.iter() { + c.sync(csn)?; + } + + Ok(()) } } @@ -254,54 +264,88 @@ impl TransportUnicastTrait for TransportUnicastUniversal { /*************************************/ /* LINK */ /*************************************/ - async fn add_link(&self, link: TransportLinkUnicast) -> ZResult<()> { - // Add the link to the channel - let mut guard = zwrite!(self.links); + async fn add_link( + &self, + link: LinkUnicastWithOpenAck, + other_initial_sn: TransportSn, + other_lease: Duration, + ) -> AddLinkResult { + let add_link_guard = zasynclock!(self.add_link_lock); // Check if we can add more inbound links - if let TransportLinkUnicastDirection::Inbound = link.config.direction { - let count = guard - .iter() - .filter(|l| l.link.config.direction == link.config.direction) - .count(); - - let limit = zcondfeat!( - "transport_multilink", - match self.config.multilink { - Some(_) => self.manager.config.unicast.max_links, - None => 1, - }, - 1 - ); - - if count >= limit { - let e = zerror!( - "Can not add Link {} with peer {}: max num of links reached {}/{}", - link, - self.config.zid, - count, - limit + { + let guard = zread!(self.links); + if let TransportLinkUnicastDirection::Inbound = link.inner_config().direction { + let count = guard + .iter() + .filter(|l| l.link.config.direction == link.inner_config().direction) + .count(); + + let limit = zcondfeat!( + "transport_multilink", + match self.config.multilink { + Some(_) => self.manager.config.unicast.max_links, + None => 1, + }, + 1 ); - return Err(e.into()); + + if count >= limit { + let e = zerror!( + "Can not add Link {} with peer {}: max num of links reached {}/{}", + link, + self.config.zid, + count, + limit + ); + return Err((e.into(), link.fail(), close::reason::MAX_LINKS)); + } } } - let link = TransportLinkUnicastUniversal::new(self.clone(), link); + // sync the RX sequence number + let _ = self.sync(other_initial_sn).await; + + // Wrap the link + let (link, ack) = link.unpack(); + let (mut link, consumer) = + TransportLinkUnicastUniversal::new(self, link, &self.priority_tx); + // Add the link to the channel + let mut guard = zwrite!(self.links); let mut links = Vec::with_capacity(guard.len() + 1); links.extend_from_slice(&guard); - links.push(link); + links.push(link.clone()); *guard = links.into_boxed_slice(); - Ok(()) + drop(guard); + drop(add_link_guard); + + // create a callback to start the link + let transport = self.clone(); + let start_link = Box::new(move || { + // Start the TX loop + let keep_alive = + self.manager.config.unicast.lease / self.manager.config.unicast.keep_alive as u32; + link.start_tx( + transport.clone(), + consumer, + &self.manager.tx_executor, + keep_alive, + ); + + // Start the RX loop + link.start_rx(transport, other_lease); + }); + + Ok((start_link, ack)) } /*************************************/ /* ACCESSORS */ /*************************************/ fn set_callback(&self, callback: Arc) { - let mut guard = zwrite!(self.callback); - *guard = Some(callback); + *zwrite!(self.callback) = Some(callback); } async fn get_alive(&self) -> AsyncMutexGuard<'_, bool> { @@ -338,52 +382,25 @@ impl TransportUnicastTrait for TransportUnicastUniversal { self.stats.clone() } - /*************************************/ - /* INITIATION */ - /*************************************/ - async fn sync(&self, initial_sn_rx: TransportSn) -> ZResult<()> { - // Mark the transport as alive and keep the lock - // to avoid concurrent new_transport and closing/closed notifications - let mut a_guard = zasynclock!(self.alive); - if *a_guard { - let e = zerror!("Transport already synched with peer: {}", self.config.zid); - log::trace!("{}", e); - return Err(e.into()); - } - - *a_guard = true; - - let csn = PrioritySn { - reliable: initial_sn_rx, - best_effort: initial_sn_rx, - }; - for c in self.priority_rx.iter() { - c.sync(csn)?; - } - - Ok(()) - } - /*************************************/ /* TERMINATION */ /*************************************/ - async fn close_link(&self, link: &TransportLinkUnicast, reason: u8) -> ZResult<()> { + async fn close_link(&self, link: Link, reason: u8) -> ZResult<()> { log::trace!("Closing link {} with peer: {}", link, self.config.zid); - let mut pipeline = zlinkget!(zread!(self.links), link) - .map(|l| l.pipeline.clone()) - .ok_or_else(|| zerror!("Cannot close Link {:?}: not found", link))?; + let transport_link_pipeline = zlinkget!(zread!(self.links), link) + .ok_or_else(|| zerror!("Cannot close Link {:?}: not found", link))? + .pipeline + .clone(); - if let Some(p) = pipeline.take() { - // Close message to be sent on the target link - let msg: TransportMessage = Close { - reason, - session: false, - } - .into(); - - p.push_transport_message(msg, Priority::Background); + // Close message to be sent on the target link + let msg: TransportMessage = Close { + reason, + session: false, } + .into(); + + transport_link_pipeline.push_transport_message(msg, Priority::Background); // Remove the link from the channel self.del_link(link).await @@ -394,7 +411,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { let mut pipelines = zread!(self.links) .iter() - .filter_map(|sl| sl.pipeline.clone()) + .map(|sl| sl.pipeline.clone()) .collect::>(); for p in pipelines.drain(..) { // Close message to be sent on all the links @@ -413,8 +430,8 @@ impl TransportUnicastTrait for TransportUnicastUniversal { self.delete().await } - fn get_links(&self) -> Vec { - zread!(self.links).iter().map(|l| l.link.clone()).collect() + fn get_links(&self) -> Vec { + zread!(self.links).iter().map(|l| l.link.link()).collect() } /*************************************/ @@ -427,46 +444,6 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } } - fn start_tx( - &self, - link: &TransportLinkUnicast, - executor: &TransportExecutor, - keep_alive: Duration, - ) -> ZResult<()> { - let mut guard = zwrite!(self.links); - match zlinkgetmut!(guard, link) { - Some(l) => { - assert!(!self.priority_tx.is_empty()); - l.start_tx(executor, keep_alive, &self.priority_tx); - Ok(()) - } - None => { - bail!( - "Can not start Link TX {} with ZID: {}", - link, - self.config.zid, - ) - } - } - } - - fn start_rx(&self, link: &TransportLinkUnicast, lease: Duration) -> ZResult<()> { - let mut guard = zwrite!(self.links); - match zlinkgetmut!(guard, link) { - Some(l) => { - l.start_rx(lease); - Ok(()) - } - None => { - bail!( - "Can not start Link RX {} with peer: {}", - link, - self.config.zid - ) - } - } - } - fn add_debug_fields<'a, 'b: 'a, 'c>( &self, s: &'c mut DebugStruct<'a, 'b>, diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index bf5be7e702..eb41e2611c 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -35,7 +35,7 @@ impl TransportUnicastUniversal { .iter() .filter_map(|tl| { if msg.is_reliable() == tl.link.link.is_reliable() { - tl.pipeline.as_ref() + Some(&tl.pipeline) } else { None } @@ -46,7 +46,7 @@ impl TransportUnicastUniversal { } // No best match found, take the first available link - if let Some(pl) = guard.iter().filter_map(|tl| tl.pipeline.as_ref()).next() { + if let Some(pl) = guard.iter().map(|tl| &tl.pipeline).next() { zpush!(guard, pl, msg); } diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index 500a174daf..e27acfe3c3 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -14,6 +14,7 @@ #[cfg(feature = "shared-memory")] mod tests { use async_std::{prelude::FutureExt, task}; + use rand::{Rng, SeedableRng}; use std::{ any::Any, convert::TryFrom, @@ -25,6 +26,7 @@ mod tests { }; use zenoh_buffers::buffer::SplitBuffer; use zenoh_core::zasync_executor_init; + use zenoh_crypto::PseudoRng; use zenoh_link::Link; use zenoh_protocol::{ core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohId}, @@ -34,7 +36,7 @@ mod tests { }, zenoh::{PushBody, Put}, }; - use zenoh_result::ZResult; + use zenoh_result::{zerror, ZResult}; use zenoh_shm::{SharedMemoryBuf, SharedMemoryManager}; use zenoh_transport::{ multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, @@ -157,10 +159,22 @@ mod tests { let peer_shm02 = ZenohId::try_from([2]).unwrap(); let peer_net01 = ZenohId::try_from([3]).unwrap(); - // Create the SharedMemoryManager - let mut shm01 = - SharedMemoryManager::make(format!("peer_shm01_{}", endpoint.protocol()), 2 * MSG_SIZE) - .unwrap(); + let mut tries = 100; + let mut prng = PseudoRng::from_entropy(); + let mut shm01 = loop { + // Create the SharedMemoryManager + if let Ok(shm01) = SharedMemoryManager::make( + format!("peer_shm01_{}_{}", endpoint.protocol(), prng.gen::()), + 2 * MSG_SIZE, + ) { + break Ok(shm01); + } + tries -= 1; + if tries == 0 { + break Err(zerror!("Unable to create SharedMemoryManager!")); + } + } + .unwrap(); // Create a peer manager with shared-memory authenticator enabled let peer_shm01_handler = Arc::new(SHPeer::new(true)); diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index ac35090cdb..9b25bb26c8 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -481,6 +481,7 @@ async fn test_transport( .into(), } .into(); + for _ in 0..MSG_COUNT { let _ = client_transport.schedule(message.clone()); } diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index 7219bf5ff2..3b10f12f03 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -319,7 +319,7 @@ fn gossip() -> Result<()> { async_std::task::block_on(async { zasync_executor_init!(); - let locator = String::from("tcp/127.0.0.1:17449"); + let locator = String::from("tcp/127.0.0.1:17446"); let ke = String::from("testKeyExprGossip"); let msg_size = 8;