diff --git a/Cargo.lock b/Cargo.lock index 1d5fab2365..9ab35d69e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4847,6 +4847,7 @@ dependencies = [ "async-trait", "env_logger", "flume", + "futures", "log", "lz4_flex", "paste", diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 66352fe141..c5e83df15b 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -142,7 +142,7 @@ /// Maximum number of sessions that can be simultaneously alive max_sessions: 1000, /// Maximum number of incoming links that are admitted per session - max_links: 1, + max_links: 100, /// Enables the LowLatency transport /// This option does not make LowLatency transport mandatory, the actual implementation of transport /// used will depend on Establish procedure and other party's settings diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 8d1a5dbc0f..b3b05d2ced 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -105,7 +105,7 @@ impl Default for TransportUnicastConf { accept_timeout: 10_000, accept_pending: 100, max_sessions: 1_000, - max_links: 1, + max_links: 100, lowlatency: false, qos: QoSUnicastConf::default(), compression: CompressionUnicastConf::default(), diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 0d9d494606..4f7edc3867 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -54,6 +54,7 @@ async-global-executor = { workspace = true } async-std = { workspace = true } async-trait = { workspace = true } flume = { workspace = true } +futures = { workspace = true } log = { workspace = true } lz4_flex = { workspace = true } paste = { workspace = true } diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index bd756d6396..fc654e3937 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -162,7 +162,7 @@ impl TransportLinkUnicastTx { .as_slice(), }; - // log::trace!("WBytes: {:02x?}", bytes); + log::trace!("Link: {:?} - WBytes: {}", self, bytes.len()); // Send the message on the link self.inner.link.write_all(bytes).await?; @@ -190,7 +190,7 @@ impl fmt::Display for TransportLinkUnicastTx { impl fmt::Debug for TransportLinkUnicastTx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TransportLinkUnicastRx") + f.debug_struct("TransportLinkUnicastTx") .field("link", &self.inner.link) .field("config", &self.inner.config) .field("buffer", &self.buffer.as_ref().map(|b| b.capacity())) @@ -230,7 +230,7 @@ impl TransportLinkUnicastRx { self.link.read(into.as_mut_slice()).await? }; - // log::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]); + log::trace!("Link: {:?} - RBytes: {}", self, end); let buffer = ZSlice::make(Arc::new(into), 0, end) .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 513cefc0a6..88b133e0f1 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -16,25 +16,23 @@ use super::transport::TransportUnicastUniversal; use crate::common::stats::TransportStats; use crate::{ common::{ - batch::{BatchConfig, RBatch}, - pipeline::{ - TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, - TransmissionPipelineProducer, - }, + batch::{Finalize, RBatch}, + pipeline::TransmissionPipelineConsumer, priority::TransportPriorityTx, }, unicast::link::{TransportLinkUnicast, TransportLinkUnicastRx, TransportLinkUnicastTx}, - TransportExecutor, }; use async_std::prelude::FutureExt; -use async_std::task; -use async_std::task::JoinHandle; +use async_std::{ + sync::RwLock as AsyncRwLock, + task::{self, JoinHandle}, +}; use std::{ sync::{Arc, RwLock}, time::Duration, }; use zenoh_buffers::ZSliceBuffer; -use zenoh_core::zwrite; +use zenoh_core::{zasyncread, zasyncwrite, zwrite}; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; @@ -50,86 +48,25 @@ pub(super) struct Tasks { pub(super) struct TransportLinkUnicastUniversal { // The underlying link pub(super) link: TransportLinkUnicast, - // The transmission pipeline - pub(super) pipeline: TransmissionPipelineProducer, // The task handling substruct tasks: Arc, } impl TransportLinkUnicastUniversal { - pub(super) fn new( - transport: &TransportUnicastUniversal, - link: TransportLinkUnicast, - priority_tx: &[TransportPriorityTx], - ) -> (Self, TransmissionPipelineConsumer) { + pub(super) fn new(link: TransportLinkUnicast, priority_tx: &[TransportPriorityTx]) -> Self { assert!(!priority_tx.is_empty()); - let config = TransmissionPipelineConf { - batch: BatchConfig { - mtu: link.config.batch.mtu, - is_streamed: link.link.is_streamed(), - #[cfg(feature = "transport_compression")] - is_compression: link.config.batch.is_compression, - }, - queue_size: transport.manager.config.queue_size, - backoff: transport.manager.config.queue_backoff, - }; - - // The pipeline - let (producer, consumer) = TransmissionPipeline::make(config, priority_tx); - let tasks = Arc::new(Tasks { handle_tx: RwLock::new(None), signal_rx: Signal::new(), handle_rx: RwLock::new(None), }); - let result = Self { - link, - pipeline: producer, - tasks, - }; - - (result, consumer) + Self { link, tasks } } } impl TransportLinkUnicastUniversal { - pub(super) fn start_tx( - &mut self, - transport: TransportUnicastUniversal, - consumer: TransmissionPipelineConsumer, - executor: &TransportExecutor, - keep_alive: Duration, - ) { - let mut guard = zwrite!(self.tasks.handle_tx); - if guard.is_none() { - // Spawn the TX task - let mut tx = self.link.tx(); - let handle = executor.spawn(async move { - let res = tx_task( - consumer, - &mut tx, - keep_alive, - #[cfg(feature = "stats")] - transport.stats.clone(), - ) - .await; - if let Err(e) = res { - log::debug!("{}", e); - // Spawn a task to avoid a deadlock waiting for this same task - // to finish in the close() joining its handle - task::spawn(async move { transport.del_link(tx.inner.link()).await }); - } - }); - *guard = Some(handle); - } - } - - pub(super) fn stop_tx(&mut self) { - self.pipeline.disable(); - } - pub(super) fn start_rx(&mut self, transport: TransportUnicastUniversal, lease: Duration) { let mut guard = zwrite!(self.tasks.handle_rx); if guard.is_none() { @@ -165,7 +102,6 @@ impl TransportLinkUnicastUniversal { pub(super) async fn close(mut self) -> ZResult<()> { log::trace!("{}: closing", self.link); - self.stop_tx(); self.stop_rx(); let handle_tx = zwrite!(self.tasks.handle_tx).take(); @@ -185,22 +121,88 @@ impl TransportLinkUnicastUniversal { /*************************************/ /* TASKS */ /*************************************/ -async fn tx_task( + +pub(super) async fn tx_task( mut pipeline: TransmissionPipelineConsumer, - link: &mut TransportLinkUnicastTx, + links: Arc>>, keep_alive: Duration, - #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { + #[derive(Default, Debug)] + enum Disc { + #[default] + Sequential, + Parallel, + Spawn, + } + + let env = std::env::var("ZENOH_TX_DISC"); + log::debug!("ZENOH_TX_DISC: {env:?}"); + let disc = match env { + Ok(d) => match d.as_str() { + "sequential" => Disc::Sequential, + "parallel" => Disc::Parallel, + "spawn" => Disc::Spawn, + _ => Disc::default(), + }, + Err(_) => Disc::default(), + }; + log::debug!("Tx disc: {disc:?}"); + loop { - match pipeline.pull().timeout(keep_alive).await { + let res = pipeline.pull().timeout(keep_alive).await; + let mut ls = zasyncread!(links); + match res { Ok(res) => match res { Some((mut batch, priority)) => { - link.send_batch(&mut batch).await?; + while ls.is_empty() { + log::trace!("No links available for TX"); + drop(ls); + task::sleep(Duration::from_millis(100)).await; + ls = zasyncread!(links); + } - #[cfg(feature = "stats")] + #[cfg(feature = "transport_compression")] { - stats.inc_tx_t_msgs(batch.stats.t_msgs); - stats.inc_tx_bytes(batch.len() as usize); + batch.config.is_compression = false; + } + + let res = batch + .finalize(None) + .map_err(|_| zerror!("Batch finalization error"))?; + + if let Finalize::Buffer = res { + panic!("Compression is not supported"); + }; + + // Send the message on the links + match disc { + Disc::Sequential => { + // Sequential + for l in ls.as_slice() { + let _ = l.inner.link.write_all(batch.as_slice()).await; + } + } + Disc::Parallel => { + // Parallel but blocking + use futures::stream::StreamExt; + let _ = futures::stream::iter(0..ls.len()) + .map(|i| ls[i].inner.link.write_all(batch.as_slice())) + .buffer_unordered(ls.len()) + .collect::>() + .await; // Ignore errors + } + Disc::Spawn => { + // Parallel spawn + for l in ls.iter() { + let c_b = batch.clone(); + let c_l = l.inner.link.clone(); + task::spawn(async move { + if let Err(e) = c_l.write_all(c_b.as_slice()).await { + log::debug!("Write failed on {:?}: {}", c_l, e); + } + }); + } + } } // Reinsert the batch into the queue @@ -210,13 +212,10 @@ async fn tx_task( }, Err(_) => { let message: TransportMessage = KeepAlive.into(); - - #[allow(unused_variables)] // Used when stats feature is enabled - let n = link.send(&message).await?; - #[cfg(feature = "stats")] - { - stats.inc_tx_t_msgs(1); - stats.inc_tx_bytes(n); + drop(ls); + let mut ls = zasyncwrite!(links); + for l in ls.as_mut_slice() { + let _ = l.send(&message).await?; } } } @@ -225,15 +224,14 @@ async fn tx_task( // Drain the transmission pipeline and write remaining bytes on the wire let mut batches = pipeline.drain(); for (mut b, _) in batches.drain(..) { - link.send_batch(&mut b) - .timeout(keep_alive) - .await - .map_err(|_| zerror!("{}: flush failed after {} ms", link, keep_alive.as_millis()))??; - - #[cfg(feature = "stats")] - { - stats.inc_tx_t_msgs(b.stats.t_msgs); - stats.inc_tx_bytes(b.len() as usize); + let mut ls = zasyncwrite!(links); + for l in ls.as_mut_slice() { + l.send_batch(&mut b) + .timeout(keep_alive) + .await + .map_err(|_| { + zerror!("{}: flush failed after {} ms", l, keep_alive.as_millis()) + })??; } } diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 935a1814b0..6220c68a10 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -16,6 +16,7 @@ use crate::{ common::{ batch::{Decode, RBatch}, priority::TransportChannelRx, + seq_num::SeqNumGenerator, }, unicast::transport_unicast_inner::TransportUnicastTrait, TransportPeerEventHandler, @@ -96,7 +97,10 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - self.verify_sn(sn, &mut guard)?; + let cont = self.verify_sn(sn, &mut guard); + if !cont { + return Ok(()); + } let callback = zread!(self.callback).clone(); if let Some(callback) = callback.as_ref() { @@ -139,7 +143,10 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - self.verify_sn(sn, &mut guard)?; + let cont = self.verify_sn(sn, &mut guard); + if !cont { + return Ok(()); + } if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); @@ -167,28 +174,20 @@ impl TransportUnicastUniversal { Ok(()) } - fn verify_sn( - &self, - sn: TransportSn, - guard: &mut MutexGuard<'_, TransportChannelRx>, - ) -> ZResult<()> { - let precedes = guard.sn.roll(sn)?; + fn verify_sn(&self, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>) -> bool { + let precedes = guard.sn.roll(sn).unwrap_or(false); if !precedes { - log::debug!( + let mut tmp = SeqNumGenerator::make(sn, zenoh_protocol::core::Bits::U32).unwrap(); + guard.defrag.clear(); + tmp.get(); + log::trace!( "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", self.config.zid, sn, - guard.sn.get() + tmp.get() ); - // Drop the fragments if needed - if !guard.defrag.is_empty() { - guard.defrag.clear(); - } - // Keep reading - return Ok(()); } - - Ok(()) + precedes } pub(super) fn read_messages(&self, mut batch: RBatch, link: &Link) -> ZResult<()> { @@ -206,7 +205,11 @@ impl TransportUnicastUniversal { match msg.body { TransportBody::Frame(msg) => self.handle_frame(msg)?, - TransportBody::Fragment(fragment) => self.handle_fragment(fragment)?, + TransportBody::Fragment(fragment) => { + if let Err(e) = self.handle_fragment(fragment) { + log::debug!("{}", e); + } + } TransportBody::Close(Close { reason, session }) => { self.handle_close(link, reason, session)? } diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 942b723365..2eb5cce5fb 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -14,21 +14,27 @@ #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::{ - common::priority::{TransportPriorityRx, TransportPriorityTx}, + common::{ + pipeline::{TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineProducer}, + priority::{TransportPriorityRx, TransportPriorityTx}, + }, unicast::{ - link::{LinkUnicastWithOpenAck, TransportLinkUnicastDirection}, + link::{LinkUnicastWithOpenAck, TransportLinkUnicastDirection, TransportLinkUnicastTx}, transport_unicast_inner::{AddLinkResult, TransportUnicastTrait}, universal::link::TransportLinkUnicastUniversal, TransportConfigUnicast, }, TransportManager, TransportPeerEventHandler, }; -use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; +use async_std::{ + sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock}, + task, +}; use async_trait::async_trait; use std::fmt::DebugStruct; use std::sync::{Arc, RwLock}; use std::time::Duration; -use zenoh_core::{zasynclock, zcondfeat, zread, zwrite}; +use zenoh_core::{zasynclock, zasyncwrite, zcondfeat, zread, zwrite}; use zenoh_link::Link; use zenoh_protocol::{ core::{Priority, WhatAmI, ZenohId}, @@ -37,13 +43,6 @@ use zenoh_protocol::{ }; use zenoh_result::{bail, zerror, ZResult}; -macro_rules! zlinkget { - ($guard:expr, $link:expr) => { - // Compare LinkUnicast link to not compare TransportLinkUnicast direction - $guard.iter().find(|tl| tl.link == $link) - }; -} - macro_rules! zlinkgetmut { ($guard:expr, $link:expr) => { // Compare LinkUnicast link to not compare TransportLinkUnicast direction @@ -71,8 +70,11 @@ pub(crate) struct TransportUnicastUniversal { pub(super) priority_tx: Arc<[TransportPriorityTx]>, // Rx priorities pub(super) priority_rx: Arc<[TransportPriorityRx]>, + // Pipeline producer + pub(super) pipeline: Arc, // The links associated to the channel pub(super) links: Arc>>, + pub(super) links_tx: Arc>>, // The callback pub(super) callback: Arc>>>, // Lock used to ensure no race in add_link method @@ -115,12 +117,30 @@ impl TransportUnicastUniversal { #[cfg(feature = "stats")] let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); + // The pipeline + let mut pc = TransmissionPipelineConf::default(); + + let env = std::env::var("ZENOH_TX_STREAMED"); + log::debug!("ZENOH_TX_STREAMED: {env:?}"); + pc.batch.is_streamed = match env { + Ok(d) => match d.as_str() { + "true" => true, + "false" => false, + _ => true, + }, + Err(_) => true, + }; + + let (producer, consumer) = TransmissionPipeline::make(pc, priority_tx.as_slice()); + let t = Arc::new(TransportUnicastUniversal { manager, config, priority_tx: priority_tx.into_boxed_slice().into(), priority_rx: priority_rx.into_boxed_slice().into(), + pipeline: Arc::new(producer), links: Arc::new(RwLock::new(vec![].into_boxed_slice())), + links_tx: Arc::new(AsyncRwLock::new(vec![])), add_link_lock: Arc::new(AsyncMutex::new(())), callback: Arc::new(RwLock::new(None)), alive: Arc::new(AsyncMutex::new(false)), @@ -128,6 +148,16 @@ impl TransportUnicastUniversal { stats, }); + let c_t = t.links_tx.clone(); + task::spawn(async move { + let _ = crate::unicast::universal::link::tx_task( + consumer, + c_t, + Duration::from_millis(2_500), + ) + .await; + }); + Ok(t) } @@ -222,7 +252,6 @@ impl TransportUnicastUniversal { match zlinkgetmut!(guard, *link) { Some(l) => { l.stop_rx(); - l.stop_tx(); Ok(()) } None => { @@ -308,8 +337,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { // Wrap the link let (link, ack) = link.unpack(); - let (mut link, consumer) = - TransportLinkUnicastUniversal::new(self, link, &self.priority_tx); + let mut link = TransportLinkUnicastUniversal::new(link, &self.priority_tx); // Add the link to the channel let mut guard = zwrite!(self.links); @@ -324,18 +352,11 @@ impl TransportUnicastTrait for TransportUnicastUniversal { // create a callback to start the link let transport = self.clone(); let start_link = Box::new(move || { - // Start the TX loop - let keep_alive = - self.manager.config.unicast.lease / self.manager.config.unicast.keep_alive as u32; - link.start_tx( - transport.clone(), - consumer, - &self.manager.tx_executor, - keep_alive, - ); - - // Start the RX loop - link.start_rx(transport, other_lease); + task::spawn(async move { + zasyncwrite!(transport.links_tx).push(link.link.tx()); + // Start the RX loop + link.start_rx(transport, other_lease); + }); }); Ok((start_link, ack)) @@ -388,11 +409,6 @@ impl TransportUnicastTrait for TransportUnicastUniversal { async fn close_link(&self, link: Link, reason: u8) -> ZResult<()> { log::trace!("Closing link {} with peer: {}", link, self.config.zid); - let transport_link_pipeline = zlinkget!(zread!(self.links), link) - .ok_or_else(|| zerror!("Cannot close Link {:?}: not found", link))? - .pipeline - .clone(); - // Close message to be sent on the target link let msg: TransportMessage = Close { reason, @@ -400,7 +416,8 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } .into(); - transport_link_pipeline.push_transport_message(msg, Priority::Background); + self.pipeline + .push_transport_message(msg, Priority::Background); // Remove the link from the channel self.del_link(link).await @@ -409,23 +426,14 @@ impl TransportUnicastTrait for TransportUnicastUniversal { async fn close(&self, reason: u8) -> ZResult<()> { log::trace!("Closing transport with peer: {}", self.config.zid); - let mut pipelines = zread!(self.links) - .iter() - .map(|sl| sl.pipeline.clone()) - .collect::>(); - for p in pipelines.drain(..) { - // Close message to be sent on all the links - // session should always be true for user-triggered close. However, in case of - // multiple links, it is safer to close all the links first. When no links are left, - // the transport is then considered closed. - let msg: TransportMessage = Close { - reason, - session: false, - } - .into(); - - p.push_transport_message(msg, Priority::Background); + let msg: TransportMessage = Close { + reason, + session: false, } + .into(); + self.pipeline + .push_transport_message(msg, Priority::Background); + // Terminate and clean up the transport self.delete().await } @@ -438,7 +446,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { /* TX */ /*************************************/ fn schedule(&self, msg: NetworkMessage) -> ZResult<()> { - match self.internal_schedule(msg) { + match self.pipeline.push_network_message(msg) { true => Ok(()), false => bail!("error scheduling message!"), } diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index eb41e2611c..c7397f6e01 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -12,79 +12,5 @@ // ZettaScale Zenoh Team, // use super::transport::TransportUnicastUniversal; -use zenoh_core::zread; -use zenoh_protocol::network::NetworkMessage; -impl TransportUnicastUniversal { - fn schedule_on_link(&self, msg: NetworkMessage) -> bool { - macro_rules! zpush { - ($guard:expr, $pipeline:expr, $msg:expr) => { - // Drop the guard before the push_zenoh_message since - // the link could be congested and this operation could - // block for fairly long time - let pl = $pipeline.clone(); - drop($guard); - log::trace!("Scheduled: {:?}", $msg); - return pl.push_network_message($msg); - }; - } - - let guard = zread!(self.links); - // First try to find the best match between msg and link reliability - if let Some(pl) = guard - .iter() - .filter_map(|tl| { - if msg.is_reliable() == tl.link.link.is_reliable() { - Some(&tl.pipeline) - } else { - None - } - }) - .next() - { - zpush!(guard, pl, msg); - } - - // No best match found, take the first available link - if let Some(pl) = guard.iter().map(|tl| &tl.pipeline).next() { - zpush!(guard, pl, msg); - } - - // No Link found - log::trace!( - "Message dropped because the transport has no links: {}", - msg - ); - - false - } - - #[allow(unused_mut)] // When feature "shared-memory" is not enabled - #[allow(clippy::let_and_return)] // When feature "stats" is not enabled - #[inline(always)] - pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> bool { - #[cfg(feature = "shared-memory")] - { - let res = if self.config.is_shm { - crate::shm::map_zmsg_to_shminfo(&mut msg) - } else { - crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shm().reader) - }; - if let Err(e) = res { - log::trace!("Failed SHM conversion: {}", e); - return false; - } - } - - let res = self.schedule_on_link(msg); - - #[cfg(feature = "stats")] - if res { - self.stats.inc_tx_n_msgs(1); - } else { - self.stats.inc_tx_n_dropped(1); - } - - res - } -} +impl TransportUnicastUniversal {}