From 413ab8d3f49ab7ebd8c06109ba76ba440be636b8 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 27 Sep 2024 17:31:07 +0200 Subject: [PATCH] Add wait_before_close option --- DEFAULT_CONFIG.json5 | 5 +- commons/zenoh-config/src/defaults.rs | 1 + commons/zenoh-config/src/lib.rs | 7 ++- io/zenoh-transport/src/common/pipeline.rs | 53 ++++++++----------- io/zenoh-transport/src/manager.rs | 13 +++++ io/zenoh-transport/src/multicast/link.rs | 1 + .../src/unicast/universal/link.rs | 1 + .../src/unicast/universal/tx.rs | 23 +++++++- 8 files changed, 70 insertions(+), 34 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index cd05f4ae01..41759bfb86 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -386,8 +386,11 @@ /// Using CongestionControl::Block the caller is blocked until a batch is available and re-inserted into the queue. /// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here. congestion_control: { - /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. + /// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available. wait_before_drop: 1000, + /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message + /// if still no batch is available. + wait_before_close: 5000000, }, /// Perform batching of messages if they are smaller of the batch_size batching: { diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index b1380873b8..ee3bbdc761 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -244,6 +244,7 @@ impl Default for CongestionControlConf { fn default() -> Self { Self { wait_before_drop: 1000, + wait_before_close: 5000000, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 9bc30dfcde..a65aef7008 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -422,8 +422,11 @@ validated_struct::validator! { /// Using CongestionControl::Block the caller is blocked until a batch is available and re-inserted into the queue. /// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here. pub congestion_control: CongestionControlConf { - /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. - pub wait_before_drop: u64, + /// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available. + wait_before_drop: u64, + /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message + /// if still no batch is available. + wait_before_close: u64, }, pub batching: BatchingConf { /// Perform adaptive batching of messages if they are smaller of the batch_size. diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 16ed92bca3..d187c3981b 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -141,7 +141,7 @@ impl StageIn { &mut self, msg: &mut NetworkMessage, priority: Priority, - deadline_before_drop: Option>, + deadline: Option, ) -> bool { // Lock the current serialization batch. let mut c_guard = self.mutex.current(); @@ -154,31 +154,22 @@ impl StageIn { None => match self.s_ref.pull() { Some(mut batch) => { batch.clear(); - self.s_out.atomic_backoff.first_write.store(LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, Ordering::Relaxed); + self.s_out.atomic_backoff.first_write.store( + LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, + Ordering::Relaxed, + ); break batch; } None => { drop(c_guard); - match deadline_before_drop { - Some(deadline) if !$fragment => { - // We are in the congestion scenario and message is droppable - // Wait for an available batch until deadline - if !deadline.map_or(false, |deadline| self.s_ref.wait_deadline(deadline)) { - // Still no available batch. - // Restore the sequence number and drop the message - $restore_sn; - return false - } - } - _ => { - // Block waiting for an available batch - if !self.s_ref.wait() { - // Some error prevented the queue to wait and give back an available batch - // Restore the sequence number and drop the message - $restore_sn; - return false; - } - } + // Wait for an available batch until deadline + if !deadline + .map_or(false, |deadline| self.s_ref.wait_deadline(deadline)) + { + // Still no available batch. + // Restore the sequence number and drop the message + $restore_sn; + return false; } c_guard = self.mutex.current(); } @@ -513,6 +504,7 @@ pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) wait_before_drop: Duration, + pub(crate) wait_before_close: Duration, pub(crate) batching_enabled: bool, pub(crate) batching_time_limit: Duration, } @@ -597,6 +589,7 @@ impl TransmissionPipeline { stage_in: stage_in.into_boxed_slice().into(), active: active.clone(), wait_before_drop: config.wait_before_drop, + wait_before_close: config.wait_before_close, }; let consumer = TransmissionPipelineConsumer { stage_out: stage_out.into_boxed_slice(), @@ -614,6 +607,7 @@ pub(crate) struct TransmissionPipelineProducer { stage_in: Arc<[Mutex]>, active: Arc, wait_before_drop: Duration, + wait_before_close: Duration, } impl TransmissionPipelineProducer { @@ -627,18 +621,15 @@ impl TransmissionPipelineProducer { (0, Priority::DEFAULT) }; // If message is droppable, compute a deadline after which the sample could be dropped - let deadline_before_drop = if msg.is_droppable() { - if self.wait_before_drop.is_zero() { - Some(None) - } else { - Some(Some(Instant::now() + self.wait_before_drop)) - } + let wait_time = if msg.is_droppable() { + self.wait_before_drop } else { - None + self.wait_before_close }; + let deadline = (!wait_time.is_zero()).then_some(Instant::now() + wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - queue.push_network_message(&mut msg, priority, deadline_before_drop) + queue.push_network_message(&mut msg, priority, deadline) } #[inline] @@ -793,6 +784,7 @@ mod tests { queue_size: [1; Priority::NUM], batching_enabled: true, wait_before_drop: Duration::from_millis(1), + wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), }; @@ -806,6 +798,7 @@ mod tests { queue_size: [1; Priority::NUM], batching_enabled: true, wait_before_drop: Duration::from_millis(1), + wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), }; diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 3746092d4a..dde0a6db2a 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -102,6 +102,7 @@ pub struct TransportManagerConfig { pub batch_size: BatchSize, pub batching: bool, pub wait_before_drop: Duration, + pub wait_before_close: Duration, pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, pub defrag_buff_size: usize, @@ -133,6 +134,7 @@ pub struct TransportManagerBuilder { batching_enabled: bool, batching_time_limit: Duration, wait_before_drop: Duration, + wait_before_close: Duration, queue_size: QueueSizeConf, defrag_buff_size: usize, link_rx_buffer_size: usize, @@ -187,6 +189,11 @@ impl TransportManagerBuilder { self } + pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self { + self.wait_before_close = wait_before_close; + self + } + pub fn queue_size(mut self, queue_size: QueueSizeConf) -> Self { self.queue_size = queue_size; self @@ -247,6 +254,9 @@ impl TransportManagerBuilder { self = self.wait_before_drop(Duration::from_micros( *link.tx().queue().congestion_control().wait_before_drop(), )); + self = self.wait_before_close(Duration::from_micros( + *link.tx().queue().congestion_control().wait_before_close(), + )); self = self.queue_size(link.tx().queue().size().clone()); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -305,6 +315,7 @@ impl TransportManagerBuilder { batch_size: self.batch_size, batching: self.batching_enabled, wait_before_drop: self.wait_before_drop, + wait_before_close: self.wait_before_close, queue_size, queue_backoff: self.batching_time_limit, defrag_buff_size: self.defrag_buff_size, @@ -344,6 +355,7 @@ impl Default for TransportManagerBuilder { let queue = QueueConf::default(); let backoff = *queue.batching().time_limit(); let wait_before_drop = *queue.congestion_control().wait_before_drop(); + let wait_before_close = *queue.congestion_control().wait_before_close(); Self { version: VERSION, zid: ZenohIdProto::rand(), @@ -352,6 +364,7 @@ impl Default for TransportManagerBuilder { batch_size: BatchSize::MAX, batching_enabled: true, wait_before_drop: Duration::from_micros(wait_before_drop), + wait_before_close: Duration::from_micros(wait_before_close), queue_size: queue.size, batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(), diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index f2258b6935..c4c23290ee 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -309,6 +309,7 @@ impl TransportLinkMulticastUniversal { batch: self.link.config.batch, queue_size: self.transport.manager.config.queue_size, wait_before_drop: self.transport.manager.config.wait_before_drop, + wait_before_close: self.transport.manager.config.wait_before_close, batching_enabled: self.transport.manager.config.batching, batching_time_limit: self.transport.manager.config.queue_backoff, }; diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index bb88a912d4..43e40595ca 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -63,6 +63,7 @@ impl TransportLinkUnicastUniversal { }, queue_size: transport.manager.config.queue_size, wait_before_drop: transport.manager.config.wait_before_drop, + wait_before_close: transport.manager.config.wait_before_close, batching_enabled: transport.manager.config.batching, batching_time_limit: transport.manager.config.queue_backoff, }; diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index cfad30771d..a241ecae61 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -14,6 +14,7 @@ use zenoh_protocol::{ core::{Priority, PriorityRange, Reliability}, network::NetworkMessage, + transport::close, }; use super::transport::TransportUnicastUniversal; @@ -110,7 +111,27 @@ impl TransportUnicastUniversal { // the link could be congested and this operation could // block for fairly long time drop(transport_links); - pipeline.push_network_message(msg) + let droppable = msg.is_droppable(); + let push = pipeline.push_network_message(msg); + if !push && !droppable { + tracing::error!( + "Unable to push non droppable network message to {}. Closing transport!", + self.config.zid + ); + zenoh_runtime::ZRuntime::RX.spawn({ + let transport = self.clone(); + async move { + if let Err(e) = transport.close(close::reason::GENERIC).await { + tracing::error!( + "Error closing transport with {}: {}", + transport.config.zid, + e + ); + } + } + }); + } + push } #[allow(unused_mut)] // When feature "shared-memory" is not enabled