From 54dbd6ca51b650c411dd3a427f1e348507baff3f Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 24 Jan 2024 15:06:45 +0100 Subject: [PATCH 01/14] Quick and dirty ack on duplicating messages on every link --- .../src/unicast/universal/tx.rs | 43 ++----------------- 1 file changed, 4 insertions(+), 39 deletions(-) diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index eb41e2611c..cb77232994 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -17,46 +17,11 @@ use zenoh_protocol::network::NetworkMessage; impl TransportUnicastUniversal { fn schedule_on_link(&self, msg: NetworkMessage) -> bool { - macro_rules! zpush { - ($guard:expr, $pipeline:expr, $msg:expr) => { - // Drop the guard before the push_zenoh_message since - // the link could be congested and this operation could - // block for fairly long time - let pl = $pipeline.clone(); - drop($guard); - log::trace!("Scheduled: {:?}", $msg); - return pl.push_network_message($msg); - }; - } - - let guard = zread!(self.links); - // First try to find the best match between msg and link reliability - if let Some(pl) = guard - .iter() - .filter_map(|tl| { - if msg.is_reliable() == tl.link.link.is_reliable() { - Some(&tl.pipeline) - } else { - None - } - }) - .next() - { - zpush!(guard, pl, msg); + let ls = zread!(self.links); + for l in ls.iter() { + l.pipeline.push_network_message(msg.clone()); } - - // No best match found, take the first available link - if let Some(pl) = guard.iter().map(|tl| &tl.pipeline).next() { - zpush!(guard, pl, msg); - } - - // No Link found - log::trace!( - "Message dropped because the transport has no links: {}", - msg - ); - - false + true } #[allow(unused_mut)] // When feature "shared-memory" is not enabled From 9fe575b85c0da74229f8b1915c4ca44c784f42ba Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 5 Feb 2024 15:24:18 +0100 Subject: [PATCH 02/14] Duplicate messages on all links --- io/zenoh-transport/src/unicast/link.rs | 6 +- .../src/unicast/universal/link.rs | 124 ++++-------------- .../src/unicast/universal/rx.rs | 34 +++-- .../src/unicast/universal/transport.rs | 86 ++++++------ .../src/unicast/universal/tx.rs | 41 +----- 5 files changed, 84 insertions(+), 207 deletions(-) diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index bd756d6396..fc654e3937 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -162,7 +162,7 @@ impl TransportLinkUnicastTx { .as_slice(), }; - // log::trace!("WBytes: {:02x?}", bytes); + log::trace!("Link: {:?} - WBytes: {}", self, bytes.len()); // Send the message on the link self.inner.link.write_all(bytes).await?; @@ -190,7 +190,7 @@ impl fmt::Display for TransportLinkUnicastTx { impl fmt::Debug for TransportLinkUnicastTx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TransportLinkUnicastRx") + f.debug_struct("TransportLinkUnicastTx") .field("link", &self.inner.link) .field("config", &self.inner.config) .field("buffer", &self.buffer.as_ref().map(|b| b.capacity())) @@ -230,7 +230,7 @@ impl TransportLinkUnicastRx { self.link.read(into.as_mut_slice()).await? }; - // log::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]); + log::trace!("Link: {:?} - RBytes: {}", self, end); let buffer = ZSlice::make(Arc::new(into), 0, end) .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 513cefc0a6..4255fe3c51 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -16,25 +16,21 @@ use super::transport::TransportUnicastUniversal; use crate::common::stats::TransportStats; use crate::{ common::{ - batch::{BatchConfig, RBatch}, - pipeline::{ - TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, - TransmissionPipelineProducer, - }, - priority::TransportPriorityTx, + batch::RBatch, pipeline::TransmissionPipelineConsumer, priority::TransportPriorityTx, }, unicast::link::{TransportLinkUnicast, TransportLinkUnicastRx, TransportLinkUnicastTx}, - TransportExecutor, }; use async_std::prelude::FutureExt; -use async_std::task; -use async_std::task::JoinHandle; +use async_std::{ + sync::RwLock as AsyncRwLock, + task::{self, JoinHandle}, +}; use std::{ sync::{Arc, RwLock}, time::Duration, }; use zenoh_buffers::ZSliceBuffer; -use zenoh_core::zwrite; +use zenoh_core::{zasyncwrite, zwrite}; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; @@ -50,86 +46,25 @@ pub(super) struct Tasks { pub(super) struct TransportLinkUnicastUniversal { // The underlying link pub(super) link: TransportLinkUnicast, - // The transmission pipeline - pub(super) pipeline: TransmissionPipelineProducer, // The task handling substruct tasks: Arc, } impl TransportLinkUnicastUniversal { - pub(super) fn new( - transport: &TransportUnicastUniversal, - link: TransportLinkUnicast, - priority_tx: &[TransportPriorityTx], - ) -> (Self, TransmissionPipelineConsumer) { + pub(super) fn new(link: TransportLinkUnicast, priority_tx: &[TransportPriorityTx]) -> Self { assert!(!priority_tx.is_empty()); - let config = TransmissionPipelineConf { - batch: BatchConfig { - mtu: link.config.batch.mtu, - is_streamed: link.link.is_streamed(), - #[cfg(feature = "transport_compression")] - is_compression: link.config.batch.is_compression, - }, - queue_size: transport.manager.config.queue_size, - backoff: transport.manager.config.queue_backoff, - }; - - // The pipeline - let (producer, consumer) = TransmissionPipeline::make(config, priority_tx); - let tasks = Arc::new(Tasks { handle_tx: RwLock::new(None), signal_rx: Signal::new(), handle_rx: RwLock::new(None), }); - let result = Self { - link, - pipeline: producer, - tasks, - }; - - (result, consumer) + Self { link, tasks } } } impl TransportLinkUnicastUniversal { - pub(super) fn start_tx( - &mut self, - transport: TransportUnicastUniversal, - consumer: TransmissionPipelineConsumer, - executor: &TransportExecutor, - keep_alive: Duration, - ) { - let mut guard = zwrite!(self.tasks.handle_tx); - if guard.is_none() { - // Spawn the TX task - let mut tx = self.link.tx(); - let handle = executor.spawn(async move { - let res = tx_task( - consumer, - &mut tx, - keep_alive, - #[cfg(feature = "stats")] - transport.stats.clone(), - ) - .await; - if let Err(e) = res { - log::debug!("{}", e); - // Spawn a task to avoid a deadlock waiting for this same task - // to finish in the close() joining its handle - task::spawn(async move { transport.del_link(tx.inner.link()).await }); - } - }); - *guard = Some(handle); - } - } - - pub(super) fn stop_tx(&mut self) { - self.pipeline.disable(); - } - pub(super) fn start_rx(&mut self, transport: TransportUnicastUniversal, lease: Duration) { let mut guard = zwrite!(self.tasks.handle_rx); if guard.is_none() { @@ -165,7 +100,6 @@ impl TransportLinkUnicastUniversal { pub(super) async fn close(mut self) -> ZResult<()> { log::trace!("{}: closing", self.link); - self.stop_tx(); self.stop_rx(); let handle_tx = zwrite!(self.tasks.handle_tx).take(); @@ -185,22 +119,19 @@ impl TransportLinkUnicastUniversal { /*************************************/ /* TASKS */ /*************************************/ -async fn tx_task( +pub(super) async fn tx_task( mut pipeline: TransmissionPipelineConsumer, - link: &mut TransportLinkUnicastTx, + links: Arc>>, keep_alive: Duration, - #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { loop { - match pipeline.pull().timeout(keep_alive).await { + let res = pipeline.pull().timeout(keep_alive).await; + let mut ls = zasyncwrite!(links); + match res { Ok(res) => match res { Some((mut batch, priority)) => { - link.send_batch(&mut batch).await?; - - #[cfg(feature = "stats")] - { - stats.inc_tx_t_msgs(batch.stats.t_msgs); - stats.inc_tx_bytes(batch.len() as usize); + for l in ls.as_mut_slice() { + l.send_batch(&mut batch).await?; } // Reinsert the batch into the queue @@ -211,12 +142,8 @@ async fn tx_task( Err(_) => { let message: TransportMessage = KeepAlive.into(); - #[allow(unused_variables)] // Used when stats feature is enabled - let n = link.send(&message).await?; - #[cfg(feature = "stats")] - { - stats.inc_tx_t_msgs(1); - stats.inc_tx_bytes(n); + for l in ls.as_mut_slice() { + let _ = l.send(&message).await?; } } } @@ -225,15 +152,14 @@ async fn tx_task( // Drain the transmission pipeline and write remaining bytes on the wire let mut batches = pipeline.drain(); for (mut b, _) in batches.drain(..) { - link.send_batch(&mut b) - .timeout(keep_alive) - .await - .map_err(|_| zerror!("{}: flush failed after {} ms", link, keep_alive.as_millis()))??; - - #[cfg(feature = "stats")] - { - stats.inc_tx_t_msgs(b.stats.t_msgs); - stats.inc_tx_bytes(b.len() as usize); + let mut ls = zasyncwrite!(links); + for l in ls.as_mut_slice() { + l.send_batch(&mut b) + .timeout(keep_alive) + .await + .map_err(|_| { + zerror!("{}: flush failed after {} ms", l, keep_alive.as_millis()) + })??; } } diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 935a1814b0..a641e9b11e 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -16,6 +16,7 @@ use crate::{ common::{ batch::{Decode, RBatch}, priority::TransportChannelRx, + seq_num::SeqNumGenerator, }, unicast::transport_unicast_inner::TransportUnicastTrait, TransportPeerEventHandler, @@ -96,7 +97,10 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - self.verify_sn(sn, &mut guard)?; + let cont = self.verify_sn(sn, &mut guard); + if !cont { + return Ok(()); + } let callback = zread!(self.callback).clone(); if let Some(callback) = callback.as_ref() { @@ -139,7 +143,10 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - self.verify_sn(sn, &mut guard)?; + let cont = self.verify_sn(sn, &mut guard); + if !cont { + return Ok(()); + } if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); @@ -167,28 +174,19 @@ impl TransportUnicastUniversal { Ok(()) } - fn verify_sn( - &self, - sn: TransportSn, - guard: &mut MutexGuard<'_, TransportChannelRx>, - ) -> ZResult<()> { - let precedes = guard.sn.roll(sn)?; + fn verify_sn(&self, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>) -> bool { + let precedes = guard.sn.roll(sn).unwrap_or(false); if !precedes { - log::debug!( + let mut tmp = SeqNumGenerator::make(sn, zenoh_protocol::core::Bits::U32).unwrap(); + tmp.get(); + log::trace!( "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", self.config.zid, sn, - guard.sn.get() + tmp.get() ); - // Drop the fragments if needed - if !guard.defrag.is_empty() { - guard.defrag.clear(); - } - // Keep reading - return Ok(()); } - - Ok(()) + precedes } pub(super) fn read_messages(&self, mut batch: RBatch, link: &Link) -> ZResult<()> { diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 942b723365..e9de9c6555 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -14,21 +14,27 @@ #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::{ - common::priority::{TransportPriorityRx, TransportPriorityTx}, + common::{ + pipeline::{TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineProducer}, + priority::{TransportPriorityRx, TransportPriorityTx}, + }, unicast::{ - link::{LinkUnicastWithOpenAck, TransportLinkUnicastDirection}, + link::{LinkUnicastWithOpenAck, TransportLinkUnicastDirection, TransportLinkUnicastTx}, transport_unicast_inner::{AddLinkResult, TransportUnicastTrait}, universal::link::TransportLinkUnicastUniversal, TransportConfigUnicast, }, TransportManager, TransportPeerEventHandler, }; -use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; +use async_std::{ + sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock}, + task, +}; use async_trait::async_trait; use std::fmt::DebugStruct; use std::sync::{Arc, RwLock}; use std::time::Duration; -use zenoh_core::{zasynclock, zcondfeat, zread, zwrite}; +use zenoh_core::{zasynclock, zasyncwrite, zcondfeat, zread, zwrite}; use zenoh_link::Link; use zenoh_protocol::{ core::{Priority, WhatAmI, ZenohId}, @@ -37,13 +43,6 @@ use zenoh_protocol::{ }; use zenoh_result::{bail, zerror, ZResult}; -macro_rules! zlinkget { - ($guard:expr, $link:expr) => { - // Compare LinkUnicast link to not compare TransportLinkUnicast direction - $guard.iter().find(|tl| tl.link == $link) - }; -} - macro_rules! zlinkgetmut { ($guard:expr, $link:expr) => { // Compare LinkUnicast link to not compare TransportLinkUnicast direction @@ -71,8 +70,11 @@ pub(crate) struct TransportUnicastUniversal { pub(super) priority_tx: Arc<[TransportPriorityTx]>, // Rx priorities pub(super) priority_rx: Arc<[TransportPriorityRx]>, + // Pipeline producer + pub(super) pipeline: Arc, // The links associated to the channel pub(super) links: Arc>>, + pub(super) links_tx: Arc>>, // The callback pub(super) callback: Arc>>>, // Lock used to ensure no race in add_link method @@ -115,12 +117,19 @@ impl TransportUnicastUniversal { #[cfg(feature = "stats")] let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); + // The pipeline + let mut pc = TransmissionPipelineConf::default(); + pc.batch.is_streamed = true; + let (producer, consumer) = TransmissionPipeline::make(pc, priority_tx.as_slice()); + let t = Arc::new(TransportUnicastUniversal { manager, config, priority_tx: priority_tx.into_boxed_slice().into(), priority_rx: priority_rx.into_boxed_slice().into(), + pipeline: Arc::new(producer), links: Arc::new(RwLock::new(vec![].into_boxed_slice())), + links_tx: Arc::new(AsyncRwLock::new(vec![])), add_link_lock: Arc::new(AsyncMutex::new(())), callback: Arc::new(RwLock::new(None)), alive: Arc::new(AsyncMutex::new(false)), @@ -128,6 +137,13 @@ impl TransportUnicastUniversal { stats, }); + let c_t = t.links_tx.clone(); + task::spawn(async move { + let _ = + crate::unicast::universal::link::tx_task(consumer, c_t, Duration::from_secs(10)) + .await; + }); + Ok(t) } @@ -222,7 +238,6 @@ impl TransportUnicastUniversal { match zlinkgetmut!(guard, *link) { Some(l) => { l.stop_rx(); - l.stop_tx(); Ok(()) } None => { @@ -308,8 +323,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { // Wrap the link let (link, ack) = link.unpack(); - let (mut link, consumer) = - TransportLinkUnicastUniversal::new(self, link, &self.priority_tx); + let mut link = TransportLinkUnicastUniversal::new(link, &self.priority_tx); // Add the link to the channel let mut guard = zwrite!(self.links); @@ -324,16 +338,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { // create a callback to start the link let transport = self.clone(); let start_link = Box::new(move || { - // Start the TX loop - let keep_alive = - self.manager.config.unicast.lease / self.manager.config.unicast.keep_alive as u32; - link.start_tx( - transport.clone(), - consumer, - &self.manager.tx_executor, - keep_alive, - ); - + task::block_on(async { zasyncwrite!(self.links_tx).push(link.link.tx()) }); // Start the RX loop link.start_rx(transport, other_lease); }); @@ -388,11 +393,6 @@ impl TransportUnicastTrait for TransportUnicastUniversal { async fn close_link(&self, link: Link, reason: u8) -> ZResult<()> { log::trace!("Closing link {} with peer: {}", link, self.config.zid); - let transport_link_pipeline = zlinkget!(zread!(self.links), link) - .ok_or_else(|| zerror!("Cannot close Link {:?}: not found", link))? - .pipeline - .clone(); - // Close message to be sent on the target link let msg: TransportMessage = Close { reason, @@ -400,7 +400,8 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } .into(); - transport_link_pipeline.push_transport_message(msg, Priority::Background); + self.pipeline + .push_transport_message(msg, Priority::Background); // Remove the link from the channel self.del_link(link).await @@ -409,23 +410,14 @@ impl TransportUnicastTrait for TransportUnicastUniversal { async fn close(&self, reason: u8) -> ZResult<()> { log::trace!("Closing transport with peer: {}", self.config.zid); - let mut pipelines = zread!(self.links) - .iter() - .map(|sl| sl.pipeline.clone()) - .collect::>(); - for p in pipelines.drain(..) { - // Close message to be sent on all the links - // session should always be true for user-triggered close. However, in case of - // multiple links, it is safer to close all the links first. When no links are left, - // the transport is then considered closed. - let msg: TransportMessage = Close { - reason, - session: false, - } - .into(); - - p.push_transport_message(msg, Priority::Background); + let msg: TransportMessage = Close { + reason, + session: false, } + .into(); + self.pipeline + .push_transport_message(msg, Priority::Background); + // Terminate and clean up the transport self.delete().await } @@ -438,7 +430,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { /* TX */ /*************************************/ fn schedule(&self, msg: NetworkMessage) -> ZResult<()> { - match self.internal_schedule(msg) { + match self.pipeline.push_network_message(msg) { true => Ok(()), false => bail!("error scheduling message!"), } diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index cb77232994..c7397f6e01 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -12,44 +12,5 @@ // ZettaScale Zenoh Team, // use super::transport::TransportUnicastUniversal; -use zenoh_core::zread; -use zenoh_protocol::network::NetworkMessage; -impl TransportUnicastUniversal { - fn schedule_on_link(&self, msg: NetworkMessage) -> bool { - let ls = zread!(self.links); - for l in ls.iter() { - l.pipeline.push_network_message(msg.clone()); - } - true - } - - #[allow(unused_mut)] // When feature "shared-memory" is not enabled - #[allow(clippy::let_and_return)] // When feature "stats" is not enabled - #[inline(always)] - pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> bool { - #[cfg(feature = "shared-memory")] - { - let res = if self.config.is_shm { - crate::shm::map_zmsg_to_shminfo(&mut msg) - } else { - crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shm().reader) - }; - if let Err(e) = res { - log::trace!("Failed SHM conversion: {}", e); - return false; - } - } - - let res = self.schedule_on_link(msg); - - #[cfg(feature = "stats")] - if res { - self.stats.inc_tx_n_msgs(1); - } else { - self.stats.inc_tx_n_dropped(1); - } - - res - } -} +impl TransportUnicastUniversal {} From 2c5de4c8658a31933ac4141b700e81484eae0e47 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 5 Feb 2024 17:38:36 +0100 Subject: [PATCH 03/14] Fix KeepAlive interval --- io/zenoh-transport/src/unicast/universal/transport.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index e9de9c6555..1a8fae7d6e 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -139,9 +139,12 @@ impl TransportUnicastUniversal { let c_t = t.links_tx.clone(); task::spawn(async move { - let _ = - crate::unicast::universal::link::tx_task(consumer, c_t, Duration::from_secs(10)) - .await; + let _ = crate::unicast::universal::link::tx_task( + consumer, + c_t, + Duration::from_millis(2_500), + ) + .await; }); Ok(t) From a9ff32986f3172879821247523855f50ade29d1e Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 5 Feb 2024 17:46:23 +0100 Subject: [PATCH 04/14] Wait for links to be available --- io/zenoh-transport/src/unicast/universal/link.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 4255fe3c51..ea2250fd34 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -127,6 +127,12 @@ pub(super) async fn tx_task( loop { let res = pipeline.pull().timeout(keep_alive).await; let mut ls = zasyncwrite!(links); + while ls.is_empty() { + log::trace!("No links available for TX"); + drop(ls); + task::sleep(Duration::from_millis(1)).await; + ls = zasyncwrite!(links); + } match res { Ok(res) => match res { Some((mut batch, priority)) => { From f79e662855132de36e017b33bba816a29275e7c8 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 5 Feb 2024 17:48:59 +0100 Subject: [PATCH 05/14] Wait for links to be available --- io/zenoh-transport/src/unicast/universal/link.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index ea2250fd34..2088b3666a 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -127,15 +127,16 @@ pub(super) async fn tx_task( loop { let res = pipeline.pull().timeout(keep_alive).await; let mut ls = zasyncwrite!(links); - while ls.is_empty() { - log::trace!("No links available for TX"); - drop(ls); - task::sleep(Duration::from_millis(1)).await; - ls = zasyncwrite!(links); - } match res { Ok(res) => match res { Some((mut batch, priority)) => { + while ls.is_empty() { + log::trace!("No links available for TX"); + drop(ls); + task::sleep(Duration::from_millis(1)).await; + ls = zasyncwrite!(links); + } + for l in ls.as_mut_slice() { l.send_batch(&mut batch).await?; } From aa9acb79c1a076238fb0c228e17073f970011a96 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 6 Feb 2024 16:32:14 +0100 Subject: [PATCH 06/14] Task spawn for write --- Cargo.lock | 1 + io/zenoh-transport/Cargo.toml | 1 + .../src/unicast/universal/link.rs | 53 ++++++++++++++++--- 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a03e85fd79..c5641e607c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5111,6 +5111,7 @@ dependencies = [ "async-trait", "env_logger", "flume", + "futures", "log", "lz4_flex", "panic-message", diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 0921f4e1ee..5cf554aa44 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -54,6 +54,7 @@ async-global-executor = { workspace = true } async-std = { workspace = true } async-trait = { workspace = true } flume = { workspace = true } +futures = { workspace = true } log = { workspace = true } lz4_flex = { workspace = true } paste = { workspace = true } diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 2088b3666a..9212e4e590 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -16,7 +16,9 @@ use super::transport::TransportUnicastUniversal; use crate::common::stats::TransportStats; use crate::{ common::{ - batch::RBatch, pipeline::TransmissionPipelineConsumer, priority::TransportPriorityTx, + batch::{Finalize, RBatch}, + pipeline::TransmissionPipelineConsumer, + priority::TransportPriorityTx, }, unicast::link::{TransportLinkUnicast, TransportLinkUnicastRx, TransportLinkUnicastTx}, }; @@ -30,7 +32,7 @@ use std::{ time::Duration, }; use zenoh_buffers::ZSliceBuffer; -use zenoh_core::{zasyncwrite, zwrite}; +use zenoh_core::{zasyncread, zasyncwrite, zwrite}; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; @@ -126,7 +128,7 @@ pub(super) async fn tx_task( ) -> ZResult<()> { loop { let res = pipeline.pull().timeout(keep_alive).await; - let mut ls = zasyncwrite!(links); + let mut ls = zasyncread!(links); match res { Ok(res) => match res { Some((mut batch, priority)) => { @@ -134,11 +136,47 @@ pub(super) async fn tx_task( log::trace!("No links available for TX"); drop(ls); task::sleep(Duration::from_millis(1)).await; - ls = zasyncwrite!(links); + ls = zasyncread!(links); } - for l in ls.as_mut_slice() { - l.send_batch(&mut batch).await?; + #[cfg(feature = "transport_compression")] + { + batch.config.is_compression = false; + } + + let res = batch + .finalize(None) + .map_err(|_| zerror!("Batch finalization error"))?; + + if let Finalize::Buffer = res { + panic!("Compression is not supported"); + }; + + // Send the message on the links + // - + + // Sequential + // for l in ls.as_mut_slice() { + // l.inner.link.write_all(bytes).await; + // } + + // Parallel but blocking + // use futures::stream::StreamExt; + // let _ = futures::stream::iter(0..ls.len()) + // .map(|i| ls[i].inner.link.write_all(bytes)) + // .buffer_unordered(ls.len()) + // .collect::>() + // .await; // Ignore errors + + // Parallel spawn + for l in ls.iter() { + let c_b = batch.clone(); + let c_l = l.inner.link.clone(); + task::spawn(async move { + if let Err(e) = c_l.write_all(c_b.as_slice()).await { + log::debug!("Write failed on {:?}: {}", c_l, e); + } + }); } // Reinsert the batch into the queue @@ -148,7 +186,8 @@ pub(super) async fn tx_task( }, Err(_) => { let message: TransportMessage = KeepAlive.into(); - + drop(ls); + let mut ls = zasyncwrite!(links); for l in ls.as_mut_slice() { let _ = l.send(&message).await?; } From cbc9512ebcd7f5afc69273e3d0b773a5279ea925 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 6 Feb 2024 16:33:50 +0100 Subject: [PATCH 07/14] Fix compilation errors --- io/zenoh-transport/src/unicast/universal/link.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 9212e4e590..a179575bce 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -156,14 +156,14 @@ pub(super) async fn tx_task( // - // Sequential - // for l in ls.as_mut_slice() { - // l.inner.link.write_all(bytes).await; + // for l in ls.as_slice() { + // let _ = l.inner.link.write_all(batch.as_slice()).await; // } // Parallel but blocking // use futures::stream::StreamExt; // let _ = futures::stream::iter(0..ls.len()) - // .map(|i| ls[i].inner.link.write_all(bytes)) + // .map(|i| ls[i].inner.link.write_all(batch.as_slice())) // .buffer_unordered(ls.len()) // .collect::>() // .await; // Ignore errors From 2b5b486721ee5a89612ed14c8f7add53cd244dc2 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 6 Feb 2024 16:45:53 +0100 Subject: [PATCH 08/14] Tx discipline configurable by env var --- .../src/unicast/universal/link.rs | 72 +++++++++++++------ 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index a179575bce..540108d8e5 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -121,11 +121,33 @@ impl TransportLinkUnicastUniversal { /*************************************/ /* TASKS */ /*************************************/ + pub(super) async fn tx_task( mut pipeline: TransmissionPipelineConsumer, links: Arc>>, keep_alive: Duration, ) -> ZResult<()> { + #[derive(Default, Debug)] + enum Disc { + #[default] + Sequential, + Parallel, + Spawn, + } + + let env = std::env::var("ZENOH_TX_DISC"); + log::debug!("Env: {env:?}"); + let disc = match env { + Ok(d) => match d.as_str() { + "sequential" => Disc::Sequential, + "parallel" => Disc::Parallel, + "spawn" => Disc::Spawn, + _ => Disc::default(), + }, + Err(_) => Disc::default(), + }; + log::debug!("Tx disc: {disc:?}"); + loop { let res = pipeline.pull().timeout(keep_alive).await; let mut ls = zasyncread!(links); @@ -153,30 +175,34 @@ pub(super) async fn tx_task( }; // Send the message on the links - // - - - // Sequential - // for l in ls.as_slice() { - // let _ = l.inner.link.write_all(batch.as_slice()).await; - // } - - // Parallel but blocking - // use futures::stream::StreamExt; - // let _ = futures::stream::iter(0..ls.len()) - // .map(|i| ls[i].inner.link.write_all(batch.as_slice())) - // .buffer_unordered(ls.len()) - // .collect::>() - // .await; // Ignore errors - - // Parallel spawn - for l in ls.iter() { - let c_b = batch.clone(); - let c_l = l.inner.link.clone(); - task::spawn(async move { - if let Err(e) = c_l.write_all(c_b.as_slice()).await { - log::debug!("Write failed on {:?}: {}", c_l, e); + match disc { + Disc::Sequential => { + // Sequential + for l in ls.as_slice() { + let _ = l.inner.link.write_all(batch.as_slice()).await; + } + } + Disc::Parallel => { + // Parallel but blocking + use futures::stream::StreamExt; + let _ = futures::stream::iter(0..ls.len()) + .map(|i| ls[i].inner.link.write_all(batch.as_slice())) + .buffer_unordered(ls.len()) + .collect::>() + .await; // Ignore errors + } + Disc::Spawn => { + // Parallel spawn + for l in ls.iter() { + let c_b = batch.clone(); + let c_l = l.inner.link.clone(); + task::spawn(async move { + if let Err(e) = c_l.write_all(c_b.as_slice()).await { + log::debug!("Write failed on {:?}: {}", c_l, e); + } + }); } - }); + } } // Reinsert the batch into the queue From 2bd2957615c2761f90a201ee15a9cd6395c8db25 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 6 Feb 2024 16:51:09 +0100 Subject: [PATCH 09/14] Tx streamed configurable by env var --- io/zenoh-transport/src/unicast/universal/link.rs | 2 +- .../src/unicast/universal/transport.rs | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 540108d8e5..1c16d1c038 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -136,7 +136,7 @@ pub(super) async fn tx_task( } let env = std::env::var("ZENOH_TX_DISC"); - log::debug!("Env: {env:?}"); + log::debug!("ZENOH_TX_DISC: {env:?}"); let disc = match env { Ok(d) => match d.as_str() { "sequential" => Disc::Sequential, diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 1a8fae7d6e..502c0ee7e5 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -119,7 +119,18 @@ impl TransportUnicastUniversal { // The pipeline let mut pc = TransmissionPipelineConf::default(); - pc.batch.is_streamed = true; + + let env = std::env::var("ZENOH_TX_STREAMED"); + log::debug!("ZENOH_TX_STREAMED: {env:?}"); + pc.batch.is_streamed = match env { + Ok(d) => match d.as_str() { + "true" => true, + "false" => false, + _ => true, + }, + Err(_) => true, + }; + let (producer, consumer) = TransmissionPipeline::make(pc, priority_tx.as_slice()); let t = Arc::new(TransportUnicastUniversal { From 8164129bd79a118f8cbc44eda3748c9ccebbe95d Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Mon, 19 Feb 2024 21:02:45 +0300 Subject: [PATCH 10/14] Change default multilink for easier evaluation testing --- DEFAULT_CONFIG.json5 | 2 +- commons/zenoh-config/src/defaults.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index bde1b8fd03..c8e96ebb7d 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -124,7 +124,7 @@ /// Maximum number of sessions that can be simultaneously alive max_sessions: 1000, /// Maximum number of incoming links that are admitted per session - max_links: 1, + max_links: 100, /// Enables the LowLatency transport /// This option does not make LowLatency transport mandatory, the actual implementation of transport /// used will depend on Establish procedure and other party's settings diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 8d1a5dbc0f..b3b05d2ced 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -105,7 +105,7 @@ impl Default for TransportUnicastConf { accept_timeout: 10_000, accept_pending: 100, max_sessions: 1_000, - max_links: 1, + max_links: 100, lowlatency: false, qos: QoSUnicastConf::default(), compression: CompressionUnicastConf::default(), From f0a07c5de8c8bd3e7139142faffcd864907c7067 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 1 Mar 2024 16:53:55 +0100 Subject: [PATCH 11/14] Don't close the transport in case of defragmentation error --- io/zenoh-transport/src/unicast/universal/rx.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index a641e9b11e..340f521969 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -178,6 +178,7 @@ impl TransportUnicastUniversal { let precedes = guard.sn.roll(sn).unwrap_or(false); if !precedes { let mut tmp = SeqNumGenerator::make(sn, zenoh_protocol::core::Bits::U32).unwrap(); + guard.defrag.clear(); tmp.get(); log::trace!( "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", @@ -204,7 +205,9 @@ impl TransportUnicastUniversal { match msg.body { TransportBody::Frame(msg) => self.handle_frame(msg)?, - TransportBody::Fragment(fragment) => self.handle_fragment(fragment)?, + TransportBody::Fragment(fragment) => { + let _ = self.handle_fragment(fragment); + } TransportBody::Close(Close { reason, session }) => { self.handle_close(link, reason, session)? } From 93a0607677a62dd5b9a8d1a1195321f5b2310bb1 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 1 Mar 2024 16:55:30 +0100 Subject: [PATCH 12/14] Add log in case of defrag error --- io/zenoh-transport/src/unicast/universal/rx.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 340f521969..6220c68a10 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -206,7 +206,9 @@ impl TransportUnicastUniversal { match msg.body { TransportBody::Frame(msg) => self.handle_frame(msg)?, TransportBody::Fragment(fragment) => { - let _ = self.handle_fragment(fragment); + if let Err(e) = self.handle_fragment(fragment) { + log::debug!("{}", e); + } } TransportBody::Close(Close { reason, session }) => { self.handle_close(link, reason, session)? From fb84b41f4bb2fea0a7bcd52eba1701e4680b6dd1 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 7 Mar 2024 14:51:54 +0300 Subject: [PATCH 13/14] modify link lock capture retry timeout --- io/zenoh-transport/src/unicast/universal/link.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 1c16d1c038..88b133e0f1 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -157,7 +157,7 @@ pub(super) async fn tx_task( while ls.is_empty() { log::trace!("No links available for TX"); drop(ls); - task::sleep(Duration::from_millis(1)).await; + task::sleep(Duration::from_millis(100)).await; ls = zasyncread!(links); } From 9825e9002d1c6648735fd61e2be48da0c2101393 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 7 Mar 2024 19:49:57 +0300 Subject: [PATCH 14/14] Eliminate potential executor blockage --- io/zenoh-transport/src/unicast/universal/transport.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 502c0ee7e5..2eb5cce5fb 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -352,9 +352,11 @@ impl TransportUnicastTrait for TransportUnicastUniversal { // create a callback to start the link let transport = self.clone(); let start_link = Box::new(move || { - task::block_on(async { zasyncwrite!(self.links_tx).push(link.link.tx()) }); - // Start the RX loop - link.start_rx(transport, other_lease); + task::spawn(async move { + zasyncwrite!(transport.links_tx).push(link.link.tx()); + // Start the RX loop + link.start_rx(transport, other_lease); + }); }); Ok((start_link, ack))