From 3e881d660ef5ae57d6c2aa736d65fcb683ec3db4 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 5 Dec 2023 12:30:55 +0300 Subject: [PATCH] WIP on transport refactoring --- io/zenoh-link-commons/src/unicast.rs | 1 - .../src/unicast/establishment/accept.rs | 18 +- .../src/unicast/establishment/open.rs | 20 +- io/zenoh-transport/src/unicast/link.rs | 2 +- .../src/unicast/lowlatency/link.rs | 27 +- .../src/unicast/lowlatency/transport.rs | 4 +- io/zenoh-transport/src/unicast/manager.rs | 297 ++++++++++++------ .../src/unicast/transport_unicast_inner.rs | 4 +- .../src/unicast/universal/link.rs | 22 +- .../src/unicast/universal/transport.rs | 18 +- 10 files changed, 263 insertions(+), 150 deletions(-) diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index d44686ff50..6a02767973 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -35,7 +35,6 @@ pub trait ConstructibleLinkManagerUnicast: Sized { fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult; } -#[derive(Clone)] pub struct LinkUnicast(pub Arc); #[async_trait] diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 112b471b9e..bee6e1941f 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -584,7 +584,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } } -pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> { +pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> { let mtu = link.get_mtu(); let config = TransportLinkUnicastConfig { mtu, @@ -592,7 +592,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) #[cfg(feature = "transport_compression")] is_compression: false, }; - let mut link = TransportLinkUnicast::new(link.clone(), config); + let mut link = TransportLinkUnicast::new(link, config); let mut fsm = AcceptLink { link: &mut link, prng: &manager.prng, @@ -710,16 +710,10 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }; - let a_link = TransportLinkUnicast::new(link.link.clone(), a_config); + let a_link = TransportLinkUnicast::new(link.link, a_config); let s_link = format!("{:?}", a_link); let transport = step!(manager.init_transport_unicast(config, a_link).await); - // Send the open_ack on the link - step!(link - .send(&oack_out.open_ack.into()) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))); - // Sync the RX sequence number let _ = step!(transport .get_inner() @@ -736,6 +730,12 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) .await .map_err(|e| (e, Some(close::reason::INVALID)))); + // Send the open_ack on the link + step!(link + .send(&oack_out.open_ack.into()) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))); + log::debug!( "New transport link accepted from {} to {}: {}.", osyn_out.other_zid, diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 4c1314dd29..2a4c1aef40 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -621,16 +621,18 @@ pub(crate) async fn open_link( #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }; - let o_link = TransportLinkUnicast::new(link.link.clone(), o_config); + let o_link = TransportLinkUnicast::new(link.link, o_config); let s_link = format!("{:?}", o_link); - let transport = step!(manager.init_transport_unicast(config, o_link).await); - - // Sync the RX sequence number - let _ = step!(transport - .get_inner() - .map_err(|e| (e, Some(close::reason::INVALID)))) - .sync(oack_out.other_initial_sn) - .await; + let transport = step!( + manager + .init_transport_unicast( + config, + o_link, + oack_out.other_initial_sn, + oack_out.other_lease + ) + .await + ); let output = InputFinalize { transport, diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index afc12bc87d..9e43550ce2 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -38,7 +38,7 @@ pub(crate) struct TransportLinkUnicastConfig { pub(crate) is_compression: bool, } -#[derive(Clone, PartialEq, Eq)] +#[derive(PartialEq, Eq)] pub(crate) struct TransportLinkUnicast { pub(crate) link: LinkUnicast, pub(crate) config: TransportLinkUnicastConfig, diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 437e9c4fa4..ae6987ebb1 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -14,6 +14,7 @@ use super::transport::TransportUnicastLowlatency; #[cfg(feature = "stats")] use crate::stats::TransportStats; +use crate::unicast::link::TransportLinkUnicastRx; use crate::{unicast::link::TransportLinkUnicast, TransportExecutor}; use async_std::task; use async_std::{prelude::FutureExt, sync::RwLock}; @@ -132,7 +133,7 @@ impl TransportUnicastLowlatency { let c_transport = self.clone(); let handle = task::spawn(async move { let guard = zasyncread!(c_transport.link); - let link = guard.clone(); + let link = guard.rx(); drop(guard); let rx_buffer_size = c_transport.manager.config.link_rx_buffer_size; @@ -197,26 +198,26 @@ async fn keepalive_task( } async fn rx_task_stream( - link: TransportLinkUnicast, + link: TransportLinkUnicastRx, transport: TransportUnicastLowlatency, lease: Duration, rx_buffer_size: usize, ) -> ZResult<()> { - async fn read(link: &TransportLinkUnicast, buffer: &mut [u8]) -> ZResult { + async fn read(link: &TransportLinkUnicastRx, buffer: &mut [u8]) -> ZResult { // 16 bits for reading the batch length let mut length = [0_u8, 0_u8, 0_u8, 0_u8]; - link.link.read_exact(&mut length).await?; + link.inner.link.read_exact(&mut length).await?; let n = u32::from_le_bytes(length) as usize; let len = buffer.len(); let b = buffer.get_mut(0..n).ok_or_else(|| { zerror!("Batch len is invalid. Received {n} but negotiated max len is {len}.") })?; - link.link.read_exact(b).await?; + link.inner.link.read_exact(b).await?; Ok(n) } // The pool of buffers - let mtu = link.config.mtu as usize; + let mtu = link.inner.config.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -237,18 +238,18 @@ async fn rx_task_stream( // Deserialize all the messages from the current ZBuf let zslice = ZSlice::make(Arc::new(buffer), 0, bytes).unwrap(); - transport.read_messages(zslice, &link.link).await?; + transport.read_messages(zslice, &link.inner.link).await?; } } async fn rx_task_dgram( - link: TransportLinkUnicast, + link: TransportLinkUnicastRx, transport: TransportUnicastLowlatency, lease: Duration, rx_buffer_size: usize, ) -> ZResult<()> { // The pool of buffers - let mtu = link.config.mtu as usize; + let mtu = link.inner.config.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -260,7 +261,7 @@ async fn rx_task_dgram( let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); // Async read from the underlying link - let bytes = link + let bytes = link.inner .link .read(&mut buffer) .timeout(lease) @@ -272,17 +273,17 @@ async fn rx_task_dgram( // Deserialize all the messages from the current ZBuf let zslice = ZSlice::make(Arc::new(buffer), 0, bytes).unwrap(); - transport.read_messages(zslice, &link.link).await?; + transport.read_messages(zslice, &link.inner.link).await?; } } async fn rx_task( - link: TransportLinkUnicast, + link: TransportLinkUnicastRx, transport: TransportUnicastLowlatency, lease: Duration, rx_buffer_size: usize, ) -> ZResult<()> { - if link.link.is_streamed() { + if link.inner.link.is_streamed() { rx_task_stream(link, transport, lease, rx_buffer_size).await } else { rx_task_dgram(link, transport, lease, rx_buffer_size).await diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index 2c83ff3f5e..4b76a1d634 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -214,7 +214,7 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { fn start_tx( &self, - _link: &TransportLinkUnicast, + _link: &Link, executor: &TransportExecutor, keep_alive: Duration, ) -> ZResult<()> { @@ -222,7 +222,7 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { Ok(()) } - fn start_rx(&self, _link: &TransportLinkUnicast, lease: Duration) -> ZResult<()> { + fn start_rx(&self, _link: &Link, lease: Duration) -> ZResult<()> { self.internal_start_rx(lease); Ok(()) } diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index da064e8f5b..1e721917d8 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -23,10 +23,10 @@ use crate::{ transport_unicast_inner::TransportUnicastTrait, universal::transport::TransportUnicastUniversal, TransportConfigUnicast, TransportUnicast, }, - TransportManager, + TransportManager, TransportPeer, }; use async_std::{prelude::FutureExt, sync::Mutex, task}; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{borrow::BorrowMut, collections::HashMap, sync::Arc, time::Duration}; #[cfg(feature = "transport_compression")] use zenoh_config::CompressionUnicastConf; #[cfg(feature = "shared-memory")] @@ -37,7 +37,7 @@ use zenoh_crypto::PseudoRng; use zenoh_link::*; use zenoh_protocol::{ core::{endpoint, ZenohId}, - transport::close, + transport::{close, TransportSn}, }; use zenoh_result::{bail, zerror, Error, ZResult}; @@ -408,111 +408,214 @@ impl TransportManager { /*************************************/ /* TRANSPORT */ /*************************************/ - pub(super) async fn init_transport_unicast( + async fn init_existing_transport_unicast( &self, config: TransportConfigUnicast, link: TransportLinkUnicast, + other_initial_sn: TransportSn, + other_lease: Duration, + transport: &Arc, ) -> Result)> { - let mut guard = zasynclock!(self.state.unicast.transports); + let existing_config = transport.get_config(); + // Verify that fundamental parameters are correct. + // Ignore the non fundamental parameters like initial SN. + if *existing_config != config { + let e = zerror!( + "Transport with peer {} already exist. Invalid config: {:?}. Expected: {:?}.", + config.zid, + config, + existing_config + ); + log::trace!("{}", e); + return Err((e.into(), Some(close::reason::INVALID))); + } - // First verify if the transport already exists - match guard.get(&config.zid) { - Some(transport) => { - let existing_config = transport.get_config(); - // If it exists, verify that fundamental parameters like are correct. - // Ignore the non fundamental parameters like initial SN. - if *existing_config != config { - let e = zerror!( - "Transport with peer {} already exist. Invalid config: {:?}. Expected: {:?}.", - config.zid, - config, - existing_config - ); - log::trace!("{}", e); - return Err((e.into(), Some(close::reason::INVALID))); - } + let c_link = (&link.link).into(); + + // Add the link to the transport + transport + .add_link(link) + .await + .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; + + // Sync the RX sequence number + transport + .sync(other_initial_sn) + .await + .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; + + // Start the TX loop + let keep_alive = self.config.unicast.lease / self.config.unicast.keep_alive as u32; + transport + .start_tx(&c_link, &self.tx_executor, keep_alive) + .map_err(|e| (e, Some(close::reason::INVALID)))?; + + // Notify the transport handler there is a new link on this transport + if let Some(callback) = transport.get_callback() { + callback.new_link(c_link.clone()); + } + + // Start the RX loop + transport + .start_rx(&c_link, other_lease) + .map_err(|e| (e, Some(close::reason::INVALID)))?; + + Ok(TransportUnicast(Arc::downgrade(transport))) + } + pub(super) async fn init_new_transport_unicast( + &self, + config: TransportConfigUnicast, + link: TransportLinkUnicast, + other_initial_sn: TransportSn, + other_lease: Duration, + transports: &mut HashMap>, + ) -> Result)> { + // Verify that we haven't reached the transport number limit + if transports.len() >= self.config.unicast.max_sessions { + let e = zerror!( + "Max transports reached ({}). Denying new transport with peer: {}", + self.config.unicast.max_sessions, + config.zid + ); + log::trace!("{}", e); + return Err((e.into(), Some(close::reason::INVALID))); + } + + let c_link: Link = (&link.link).into(); + + // Create the transport + let is_multilink = zcondfeat!("transport_multilink", config.multilink.is_some(), false); + + // select and create transport implementation depending on the cfg and enabled features + let a_t = { + if config.is_lowlatency { + log::debug!("Will use LowLatency transport!"); + TransportUnicastLowlatency::make(self.clone(), config.clone(), link) + .map_err(|e| (e, Some(close::reason::INVALID))) + .map(|v| Arc::new(v) as Arc)? + } else { + log::debug!("Will use Universal transport!"); + let t: Arc = + TransportUnicastUniversal::make(self.clone(), config.clone()) + .map_err(|e| (e, Some(close::reason::INVALID))) + .map(|v| Arc::new(v) as Arc)?; // Add the link to the transport - transport - .add_link(link) + t.add_link(link) .await .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; + t + } + }; + + let transport = TransportUnicast(Arc::downgrade(&a_t)); + + // Sync the RX sequence number + a_t.sync(other_initial_sn) + .await + .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; - Ok(TransportUnicast(Arc::downgrade(transport))) + // Start the TX loop + let keep_alive = self.config.unicast.lease / self.config.unicast.keep_alive as u32; + a_t.start_tx(&c_link, &self.tx_executor, keep_alive) + .map_err(|e| (e, Some(close::reason::INVALID)))?; + + // Assign a callback to the new transport + let peer = TransportPeer { + zid: a_t.get_zid(), + whatami: a_t.get_whatami(), + links: vec![c_link.clone()], + is_qos: a_t.is_qos(), + #[cfg(feature = "shared-memory")] + is_shm: transport.is_shm(), + }; + // Notify the transport handler that there is a new transport and get back a callback + // NOTE: the read loop of the link the open message was sent on remains blocked + // until new_unicast() returns. The read_loop in the various links + // waits for any eventual transport to associate to. + let callback = self + .config + .handler + .new_unicast(peer, transport.clone()) + .map_err(|e| (e, Some(close::reason::INVALID)))?; + + // Set the callback on the transport + a_t.set_callback(callback.clone()); + + // Notify the transport handler there is a new link on this transport + callback.new_link(c_link.clone()); + + // Start the RX loop + a_t.start_rx(&c_link, other_lease) + .map_err(|e| (e, Some(close::reason::INVALID)))?; + + // Add the transport transport to the list of active transports + transports.insert(config.zid, a_t); + + zcondfeat!( + "shared-memory", + { + log::debug!( + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, shm: {}, multilink: {}, lowlatency: {}", + self.config.zid, + config.zid, + config.whatami, + config.sn_resolution, + config.tx_initial_sn, + config.is_qos, + config.is_shm, + is_multilink, + config.is_lowlatency + ); + }, + { + log::debug!( + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}, lowlatency: {}", + self.config.zid, + config.zid, + config.whatami, + config.sn_resolution, + config.tx_initial_sn, + config.is_qos, + is_multilink, + config.is_lowlatency + ); + } + ); + + Ok(transport) + } + + pub(super) async fn init_transport_unicast( + &self, + config: TransportConfigUnicast, + link: TransportLinkUnicast, + other_initial_sn: TransportSn, + other_lease: Duration, + ) -> Result)> { + let mut guard = zasynclock!(self.state.unicast.transports); + + // First verify if the transport already exists + match guard.get(&config.zid) { + Some(transport) => { + self.init_existing_transport_unicast( + config, + link, + other_initial_sn, + other_lease, + transport, + ) + .await } None => { - // Then verify that we haven't reached the transport number limit - if guard.len() >= self.config.unicast.max_sessions { - let e = zerror!( - "Max transports reached ({}). Denying new transport with peer: {}", - self.config.unicast.max_sessions, - config.zid - ); - log::trace!("{}", e); - return Err((e.into(), Some(close::reason::INVALID))); - } - - // Create the transport - let is_multilink = - zcondfeat!("transport_multilink", config.multilink.is_some(), false); - - // select and create transport implementation depending on the cfg and enabled features - let a_t = { - if config.is_lowlatency { - log::debug!("Will use LowLatency transport!"); - TransportUnicastLowlatency::make(self.clone(), config.clone(), link) - .map_err(|e| (e, Some(close::reason::INVALID))) - .map(|v| Arc::new(v) as Arc)? - } else { - log::debug!("Will use Universal transport!"); - let t: Arc = - TransportUnicastUniversal::make(self.clone(), config.clone()) - .map_err(|e| (e, Some(close::reason::INVALID))) - .map(|v| Arc::new(v) as Arc)?; - // Add the link to the transport - t.add_link(link) - .await - .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; - t - } - }; - - // Add the transport transport to the list of active transports - let transport = TransportUnicast(Arc::downgrade(&a_t)); - guard.insert(config.zid, a_t); - - zcondfeat!( - "shared-memory", - { - log::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, shm: {}, multilink: {}, lowlatency: {}", - self.config.zid, - config.zid, - config.whatami, - config.sn_resolution, - config.tx_initial_sn, - config.is_qos, - config.is_shm, - is_multilink, - config.is_lowlatency - ); - }, - { - log::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}, lowlatency: {}", - self.config.zid, - config.zid, - config.whatami, - config.sn_resolution, - config.tx_initial_sn, - config.is_qos, - is_multilink, - config.is_lowlatency - ); - } - ); - - Ok(transport) + self.init_new_transport_unicast( + config, + link, + other_initial_sn, + other_lease, + &mut guard, + ) + .await } } } @@ -602,7 +705,7 @@ impl TransportManager { // Spawn a task to accept the link let c_manager = self.clone(); task::spawn(async move { - if let Err(e) = super::establishment::accept::accept_link(&link, &c_manager) + if let Err(e) = super::establishment::accept::accept_link(link, &c_manager) .timeout(c_manager.config.unicast.accept_timeout) .await { diff --git a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs index 7361dc8b1f..7de0429044 100644 --- a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs +++ b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs @@ -59,7 +59,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { fn schedule(&self, msg: NetworkMessage) -> ZResult<()>; fn start_tx( &self, - link: &TransportLinkUnicast, + link: &Link, executor: &TransportExecutor, keep_alive: Duration, ) -> ZResult<()>; @@ -67,7 +67,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { /*************************************/ /* RX */ /*************************************/ - fn start_rx(&self, link: &TransportLinkUnicast, lease: Duration) -> ZResult<()>; + fn start_rx(&self, link: &Link, lease: Duration) -> ZResult<()>; /*************************************/ /* INITIATION */ diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index beb31a0883..7a8394d5bf 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -38,7 +38,7 @@ use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; #[derive(Clone)] pub(super) struct TransportLinkUnicastUniversal { // The underlying link - pub(super) link: TransportLinkUnicast, + pub(super) link: Arc, // The transmission pipeline pub(super) pipeline: Option, // The transport this link is associated to @@ -52,7 +52,7 @@ pub(super) struct TransportLinkUnicastUniversal { impl TransportLinkUnicastUniversal { pub(super) fn new(transport: TransportUnicastUniversal, link: TransportLinkUnicast) -> Self { Self { - link, + link: Arc::new(link), pipeline: None, transport, handle_tx: None, @@ -84,12 +84,12 @@ impl TransportLinkUnicastUniversal { self.pipeline = Some(producer); // Spawn the TX task - let c_link = self.link.clone(); + let tx = self.link.tx(); let c_transport = self.transport.clone(); let handle = executor.spawn(async move { let res = tx_task( consumer, - c_link.tx(), + &mut tx, keep_alive, #[cfg(feature = "stats")] c_transport.stats.clone(), @@ -99,7 +99,7 @@ impl TransportLinkUnicastUniversal { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - task::spawn(async move { c_transport.del_link(c_link.into()).await }); + task::spawn(async move { c_transport.del_link((&tx.inner.link).into()).await }); } }); self.handle_tx = Some(Arc::new(handle)); @@ -115,7 +115,7 @@ impl TransportLinkUnicastUniversal { pub(super) fn start_rx(&mut self, lease: Duration) { if self.handle_rx.is_none() { // Spawn the RX task - let c_link = self.link.clone(); + let rx = self.link.rx(); let c_transport = self.transport.clone(); let c_signal = self.signal_rx.clone(); let c_rx_buffer_size = self.transport.manager.config.link_rx_buffer_size; @@ -123,7 +123,7 @@ impl TransportLinkUnicastUniversal { let handle = task::spawn(async move { // Start the consume task let res = rx_task( - c_link.rx(), + &mut rx, c_transport.clone(), lease, c_signal.clone(), @@ -135,7 +135,7 @@ impl TransportLinkUnicastUniversal { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - task::spawn(async move { c_transport.del_link(c_link.into()).await }); + task::spawn(async move { c_transport.del_link((&rx.inner.link).into()).await }); } }); self.handle_rx = Some(Arc::new(handle)); @@ -171,7 +171,7 @@ impl TransportLinkUnicastUniversal { /*************************************/ async fn tx_task( mut pipeline: TransmissionPipelineConsumer, - mut link: TransportLinkUnicastTx, + link: &mut TransportLinkUnicastTx, keep_alive: Duration, #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { @@ -225,7 +225,7 @@ async fn tx_task( } async fn rx_task( - mut link: TransportLinkUnicastRx, + link: &mut TransportLinkUnicastRx, transport: TransportUnicastUniversal, lease: Duration, signal: Signal, @@ -266,7 +266,7 @@ async fn rx_task( let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); while !signal.is_triggered() { // Async read from the underlying link - let action = read(&mut link, &pool) + let action = read(link, &pool) .race(stop(signal.clone())) .timeout(lease) .await diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 77a48ee09a..f6dcd985a5 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -51,6 +51,14 @@ macro_rules! zlinkgetmut { }; } + +macro_rules! zlinkgetmut2 { + ($guard:expr, $link:expr) => { + // Compare LinkUnicast link to not compare TransportLinkUnicast direction + $guard.iter_mut().find(|tl| $link == &tl.link.link) + }; +} + macro_rules! zlinkindex { ($guard:expr, $link:expr) => { // Compare LinkUnicast link to not compare TransportLinkUnicast direction @@ -416,7 +424,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } fn get_links(&self) -> Vec { - zread!(self.links).iter().map(|l| (&l.link).into()).collect() + zread!(self.links).iter().map(|l| (&l.link.link).into()).collect() } /*************************************/ @@ -431,12 +439,12 @@ impl TransportUnicastTrait for TransportUnicastUniversal { fn start_tx( &self, - link: &TransportLinkUnicast, + link: &Link, executor: &TransportExecutor, keep_alive: Duration, ) -> ZResult<()> { let mut guard = zwrite!(self.links); - match zlinkgetmut!(guard, link) { + match zlinkgetmut2!(guard, link) { Some(l) => { assert!(!self.priority_tx.is_empty()); l.start_tx(executor, keep_alive, &self.priority_tx); @@ -452,9 +460,9 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } } - fn start_rx(&self, link: &TransportLinkUnicast, lease: Duration) -> ZResult<()> { + fn start_rx(&self, link: &Link, lease: Duration) -> ZResult<()> { let mut guard = zwrite!(self.links); - match zlinkgetmut!(guard, link) { + match zlinkgetmut2!(guard, link) { Some(l) => { l.start_rx(lease); Ok(())