diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 1f9094efa6..4fdaa34e88 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -261,6 +261,13 @@ data_low: 4, background: 4, }, + /// Congestion occurs when the queue is empty (no available batch). + /// Using CongestionControl::Block the caller is blocked until a batch is available and re-insterted into the queue. + /// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here. + congestion_control: { + /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. + wait_before_drop: 1000 + }, /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. /// Higher values lead to a more aggressive batching but it will introduce additional latency. backoff: 100, diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 7c9184702f..f1973d2c0d 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -170,6 +170,7 @@ impl Default for QueueConf { fn default() -> Self { Self { size: QueueSizeConf::default(), + congestion_control: CongestionControlConf::default(), backoff: 100, } } @@ -195,6 +196,14 @@ impl Default for QueueSizeConf { } } +impl Default for CongestionControlConf { + fn default() -> Self { + Self { + wait_before_drop: 1000, + } + } +} + impl Default for LinkRxConf { fn default() -> Self { Self { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 3a0a9c3c20..2b5485fa6b 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -352,6 +352,13 @@ validated_struct::validator! { data_low: usize, background: usize, } where (queue_size_validator), + /// Congestion occurs when the queue is empty (no available batch). + /// Using CongestionControl::Block the caller is blocked until a batch is available and re-insterted into the queue. + /// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here. + pub congestion_control: CongestionControlConf { + /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. + pub wait_before_drop: u64, + }, /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. /// Higher values lead to a more aggressive batching but it will introduce additional latency. backoff: u64, diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 6ef385276f..9df7632f7a 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -19,10 +19,12 @@ use super::{ }; use flume::{bounded, Receiver, Sender}; use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; -use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; -use std::thread; use std::time::Duration; +use std::{ + sync::atomic::{AtomicBool, AtomicU16, Ordering}, + time::Instant, +}; use zenoh_buffers::{ reader::{HasReader, Reader}, writer::HasWriter, @@ -63,6 +65,10 @@ impl StageInRefill { fn wait(&self) -> bool { self.n_ref_r.recv().is_ok() } + + fn wait_deadline(&self, instant: Instant) -> bool { + self.n_ref_r.recv_deadline(instant).is_ok() + } } // Inner structure to link the initial stage with the final stage of the pipeline @@ -121,13 +127,15 @@ struct StageIn { } impl StageIn { - fn push_network_message(&mut self, msg: &mut NetworkMessage, priority: Priority) -> bool { + fn push_network_message( + &mut self, + msg: &mut NetworkMessage, + priority: Priority, + deadline_before_drop: Option, + ) -> bool { // Lock the current serialization batch. let mut c_guard = self.mutex.current(); - // Check congestion control - let is_droppable = msg.is_droppable(); - macro_rules! zgetbatch_rets { ($fragment:expr, $restore_sn:expr) => { loop { @@ -140,19 +148,25 @@ impl StageIn { } None => { drop(c_guard); - if !$fragment && is_droppable { - // Restore the sequence number - $restore_sn; - // We are in the congestion scenario - // The yield is to avoid the writing task to spin - // indefinitely and monopolize the CPU usage. - thread::yield_now(); - return false; - } else { - if !self.s_ref.wait() { - // Restore the sequence number - $restore_sn; - return false; + match deadline_before_drop { + Some(deadline) if !$fragment => { + // We are in the congestion scenario and message is droppable + // Wait for an available batch until deadline + if !self.s_ref.wait_deadline(deadline) { + // Still no available batch. + // Restore the sequence number and drop the message + $restore_sn; + return false + } + } + _ => { + // Block waiting for an available batch + if !self.s_ref.wait() { + // Some error prevented the queue to wait and give back an available batch + // Restore the sequence number and drop the message + $restore_sn; + return false; + } } } c_guard = self.mutex.current(); @@ -487,24 +501,10 @@ impl StageOut { pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], + pub(crate) wait_before_drop: Duration, pub(crate) backoff: Duration, } -impl Default for TransmissionPipelineConf { - fn default() -> Self { - Self { - batch: BatchConfig { - mtu: BatchSize::MAX, - is_streamed: false, - #[cfg(feature = "transport_compression")] - is_compression: false, - }, - queue_size: [1; Priority::NUM], - backoff: Duration::from_micros(1), - } - } -} - // A 2-stage transmission pipeline pub(crate) struct TransmissionPipeline; impl TransmissionPipeline { @@ -579,6 +579,7 @@ impl TransmissionPipeline { let producer = TransmissionPipelineProducer { stage_in: stage_in.into_boxed_slice().into(), active: active.clone(), + wait_before_drop: config.wait_before_drop, }; let consumer = TransmissionPipelineConsumer { stage_out: stage_out.into_boxed_slice(), @@ -595,6 +596,7 @@ pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, active: Arc, + wait_before_drop: Duration, } impl TransmissionPipelineProducer { @@ -607,9 +609,15 @@ impl TransmissionPipelineProducer { } else { (0, Priority::default()) }; + // If message is droppable, compute a deadline after which the sample could be dropped + let deadline_before_drop = if msg.is_droppable() { + Some(Instant::now() + self.wait_before_drop) + } else { + None + }; // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - queue.push_network_message(&mut msg, priority) + queue.push_network_message(&mut msg, priority, deadline_before_drop) } #[inline] @@ -732,7 +740,7 @@ mod tests { const SLEEP: Duration = Duration::from_millis(100); const TIMEOUT: Duration = Duration::from_secs(60); - const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf { + const CONFIG_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf { batch: BatchConfig { mtu: BatchSize::MAX, is_streamed: true, @@ -740,6 +748,19 @@ mod tests { is_compression: true, }, queue_size: [1; Priority::NUM], + wait_before_drop: Duration::from_millis(1), + backoff: Duration::from_micros(1), + }; + + const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf { + batch: BatchConfig { + mtu: BatchSize::MAX, + is_streamed: false, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, + queue_size: [1; Priority::NUM], + wait_before_drop: Duration::from_millis(1), backoff: Duration::from_micros(1), }; @@ -847,10 +868,8 @@ mod tests { // Compute the number of messages to send let num_msg = max_msgs.min(bytes / ps); - let (producer, consumer) = TransmissionPipeline::make( - TransmissionPipelineConf::default(), - priorities.as_slice(), - ); + let (producer, consumer) = + TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice()); let t_c = task::spawn(async move { consume(consumer, num_msg).await; @@ -874,7 +893,7 @@ mod tests { // Make sure to put only one message per batch: set the payload size // to half of the batch in such a way the serialized zenoh message // will be larger then half of the batch size (header + payload). - let payload_size = (CONFIG.batch.mtu / 2) as usize; + let payload_size = (CONFIG_STREAMED.batch.mtu / 2) as usize; // Send reliable messages let key = "test".into(); @@ -900,7 +919,7 @@ mod tests { // The last push should block since there shouldn't any more batches // available for serialization. - let num_msg = 1 + CONFIG.queue_size[0]; + let num_msg = 1 + CONFIG_STREAMED.queue_size[0]; for i in 0..num_msg { println!( "Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes" @@ -919,7 +938,7 @@ mod tests { let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?; let priorities = vec![tct]; let (producer, mut consumer) = - TransmissionPipeline::make(TransmissionPipelineConf::default(), priorities.as_slice()); + TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice()); let counter = Arc::new(AtomicUsize::new(0)); @@ -937,10 +956,12 @@ mod tests { // Wait to have sent enough messages and to have blocked println!( "Pipeline Blocking [---]: waiting to have {} messages being scheduled", - CONFIG.queue_size[Priority::MAX as usize] + CONFIG_STREAMED.queue_size[Priority::MAX as usize] ); let check = async { - while counter.load(Ordering::Acquire) < CONFIG.queue_size[Priority::MAX as usize] { + while counter.load(Ordering::Acquire) + < CONFIG_STREAMED.queue_size[Priority::MAX as usize] + { tokio::time::sleep(SLEEP).await; } }; @@ -972,7 +993,8 @@ mod tests { // Queue let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX)).unwrap(); let priorities = vec![tct]; - let (producer, mut consumer) = TransmissionPipeline::make(CONFIG, priorities.as_slice()); + let (producer, mut consumer) = + TransmissionPipeline::make(CONFIG_STREAMED, priorities.as_slice()); let count = Arc::new(AtomicUsize::new(0)); let size = Arc::new(AtomicUsize::new(0)); diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 9bf40e5fd3..f16a68cfba 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -94,6 +94,7 @@ pub struct TransportManagerConfig { pub whatami: WhatAmI, pub resolution: Resolution, pub batch_size: u16, + pub wait_before_drop: Duration, pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, pub defrag_buff_size: usize, @@ -122,6 +123,7 @@ pub struct TransportManagerBuilder { whatami: WhatAmI, resolution: Resolution, batch_size: u16, + wait_before_drop: Duration, queue_size: QueueSizeConf, queue_backoff: Duration, defrag_buff_size: usize, @@ -154,6 +156,11 @@ impl TransportManagerBuilder { self } + pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self { + self.wait_before_drop = wait_before_drop; + self + } + pub fn queue_size(mut self, queue_size: QueueSizeConf) -> Self { self.queue_size = queue_size; self @@ -212,7 +219,11 @@ impl TransportManagerBuilder { self = self.batch_size(*link.tx().batch_size()); self = self.defrag_buff_size(*link.rx().max_message_size()); self = self.link_rx_buffer_size(*link.rx().buffer_size()); + self = self.wait_before_drop(Duration::from_micros( + *link.tx().queue().congestion_control().wait_before_drop(), + )); self = self.queue_size(link.tx().queue().size().clone()); + self = self.queue_backoff(Duration::from_nanos(*link.tx().queue().backoff())); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -259,6 +270,7 @@ impl TransportManagerBuilder { whatami: self.whatami, resolution: self.resolution, batch_size: self.batch_size, + wait_before_drop: self.wait_before_drop, queue_size, queue_backoff: self.queue_backoff, defrag_buff_size: self.defrag_buff_size, @@ -292,12 +304,14 @@ impl Default for TransportManagerBuilder { let link_rx = LinkRxConf::default(); let queue = QueueConf::default(); let backoff = *queue.backoff(); + let wait_before_drop = *queue.congestion_control().wait_before_drop(); Self { version: VERSION, zid: ZenohId::rand(), whatami: zenoh_config::defaults::mode, resolution: Resolution::default(), batch_size: BatchSize::MAX, + wait_before_drop: Duration::from_micros(wait_before_drop), queue_size: queue.size, queue_backoff: Duration::from_nanos(backoff), defrag_buff_size: *link_rx.max_message_size(), diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 3565f747a0..bfbdd3af61 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -320,6 +320,7 @@ impl TransportLinkMulticastUniversal { let tpc = TransmissionPipelineConf { batch: self.link.config.batch, queue_size: self.transport.manager.config.queue_size, + wait_before_drop: self.transport.manager.config.wait_before_drop, backoff: self.transport.manager.config.queue_backoff, }; // The pipeline diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 5c07b69738..93a6c717dd 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -59,6 +59,7 @@ impl TransportLinkUnicastUniversal { is_compression: link.config.batch.is_compression, }, queue_size: transport.manager.config.queue_size, + wait_before_drop: transport.manager.config.wait_before_drop, backoff: transport.manager.config.queue_backoff, };