From df86f75c321249d2e8b669e26e2c0afb1b56a744 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 11 Dec 2024 15:36:44 +0100 Subject: [PATCH] fix: don't emit an error log if pipeline is closed (#1658) * fix: don't emit an error log if pipeline is closed Before, there was no distinction between the pipeline dropping a message because its deadline was reached or because the transport was closed. As a consequence, blocking message which was dropped because of closed transport would still emit an error log. This PR introduce the distinction between the two dropping causes. * fix: fix dumb merge resolution * fix: removed half-backed test * test: add test * fix: lint * fix: lint * fix: fix test * fix: fix test * fix: lint --- io/zenoh-transport/src/common/pipeline.rs | 93 +++++++++++++++---- io/zenoh-transport/src/multicast/mod.rs | 2 +- io/zenoh-transport/src/multicast/tx.rs | 15 +-- .../src/unicast/universal/transport.rs | 5 +- .../src/unicast/universal/tx.rs | 17 ++-- 5 files changed, 92 insertions(+), 40 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 256b4e760a..bc9319a7d1 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + fmt, ops::Add, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering}, @@ -40,7 +41,7 @@ use zenoh_protocol::{ AtomicBatchSize, BatchSize, TransportMessage, }, }; -use zenoh_sync::{event, Notifier, Waiter}; +use zenoh_sync::{event, Notifier, WaitDeadlineError, Waiter}; use super::{ batch::{Encode, WBatch}, @@ -56,6 +57,15 @@ struct StageInRefill { s_ref_r: RingBufferReader, } +#[derive(Debug)] +pub(crate) struct TransportClosed; +impl fmt::Display for TransportClosed { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "transport closed") + } +} +impl std::error::Error for TransportClosed {} + impl StageInRefill { fn pull(&mut self) -> Option { self.s_ref_r.pull() @@ -65,8 +75,12 @@ impl StageInRefill { self.n_ref_r.wait().is_ok() } - fn wait_deadline(&self, instant: Instant) -> bool { - self.n_ref_r.wait_deadline(instant).is_ok() + fn wait_deadline(&self, instant: Instant) -> Result { + match self.n_ref_r.wait_deadline(instant) { + Ok(()) => Ok(true), + Err(WaitDeadlineError::Deadline) => Ok(false), + Err(WaitDeadlineError::WaitError) => Err(TransportClosed), + } } } @@ -214,9 +228,9 @@ impl Deadline { } #[inline] - fn wait(&mut self, s_ref: &StageInRefill) -> bool { + fn wait(&mut self, s_ref: &StageInRefill) -> Result { match self.lazy_deadline.deadline() { - DeadlineSetting::Immediate => false, + DeadlineSetting::Immediate => Ok(false), DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant), } } @@ -243,7 +257,7 @@ impl StageIn { msg: &mut NetworkMessage, priority: Priority, deadline: &mut Deadline, - ) -> bool { + ) -> Result { // Lock the current serialization batch. let mut c_guard = self.mutex.current(); @@ -264,7 +278,7 @@ impl StageIn { None => { drop(c_guard); // Wait for an available batch until deadline - if !deadline.wait(&self.s_ref) { + if !deadline.wait(&self.s_ref)? { // Still no available batch. // Restore the sequence number and drop the message $($restore_sn)? @@ -272,7 +286,7 @@ impl StageIn { "Zenoh message dropped because it's over the deadline {:?}: {:?}", deadline.lazy_deadline.wait_time, msg ); - return false; + return Ok(false); } c_guard = self.mutex.current(); } @@ -287,13 +301,13 @@ impl StageIn { if !self.batching || $msg.is_express() { // Move out existing batch self.s_out.move_batch($batch); - return true; + return Ok(true); } else { let bytes = $batch.len(); *c_guard = Some($batch); drop(c_guard); self.s_out.notify(bytes); - return true; + return Ok(true); } }}; } @@ -404,7 +418,7 @@ impl StageIn { // Clean the fragbuf self.fragbuf.clear(); - true + Ok(true) } #[inline] @@ -767,7 +781,10 @@ pub(crate) struct TransmissionPipelineProducer { impl TransmissionPipelineProducer { #[inline] - pub(crate) fn push_network_message(&self, mut msg: NetworkMessage) -> bool { + pub(crate) fn push_network_message( + &self, + mut msg: NetworkMessage, + ) -> Result { // If the queue is not QoS, it means that we only have one priority with index 0. let (idx, priority) = if self.stage_in.len() > 1 { let priority = msg.priority(); @@ -780,7 +797,7 @@ impl TransmissionPipelineProducer { let (wait_time, max_wait_time) = if msg.is_droppable() { // Checked if we are blocked on the priority queue and we drop directly the message if self.status.is_congested(priority) { - return false; + return Ok(false); } (self.wait_before_drop.0, Some(self.wait_before_drop.1)) } else { @@ -789,11 +806,11 @@ impl TransmissionPipelineProducer { let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - let sent = queue.push_network_message(&mut msg, priority, &mut deadline); + let sent = queue.push_network_message(&mut msg, priority, &mut deadline)?; if !sent { self.status.set_congested(priority, true); } - sent + Ok(sent) } #[inline] @@ -1002,7 +1019,7 @@ mod tests { "Pipeline Flow [>>>]: Pushed {} msgs ({payload_size} bytes)", i + 1 ); - queue.push_network_message(message.clone()); + queue.push_network_message(message.clone()).unwrap(); } } @@ -1129,7 +1146,7 @@ mod tests { println!( "Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes" ); - queue.push_network_message(message.clone()); + queue.push_network_message(message.clone()).unwrap(); let c = counter.fetch_add(1, Ordering::AcqRel); println!( "Pipeline Blocking [>>>]: ({}) Scheduled message #{} (tot {}) with payload size of {} bytes", @@ -1173,12 +1190,13 @@ mod tests { timeout(TIMEOUT, check).await?; - // Disable and drain the queue - timeout( + // Drain the queue (but don't drop it to avoid dropping the messages) + let _consumer = timeout( TIMEOUT, task::spawn_blocking(move || { println!("Pipeline Blocking [---]: draining the queue"); let _ = consumer.drain(); + consumer }), ) .await??; @@ -1242,7 +1260,7 @@ mod tests { let duration = Duration::from_millis(5_500); let start = Instant::now(); while start.elapsed() < duration { - producer.push_network_message(message.clone()); + producer.push_network_message(message.clone()).unwrap(); } } } @@ -1269,4 +1287,39 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn tx_pipeline_closed() -> ZResult<()> { + // Pipeline + let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?; + let priorities = vec![tct]; + let (producer, consumer) = + TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice()); + // Drop consumer to close the pipeline + drop(consumer); + + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, true), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + payload: vec![42u8].into(), + }), + } + .into(); + // First message should not be rejected as the is one batch available in the queue + assert!(producer.push_network_message(message.clone()).is_ok()); + // Second message should be rejected + assert!(producer.push_network_message(message.clone()).is_err()); + + Ok(()) + } } diff --git a/io/zenoh-transport/src/multicast/mod.rs b/io/zenoh-transport/src/multicast/mod.rs index fbb656264d..cd76176b50 100644 --- a/io/zenoh-transport/src/multicast/mod.rs +++ b/io/zenoh-transport/src/multicast/mod.rs @@ -113,7 +113,7 @@ impl TransportMulticast { #[inline(always)] pub fn schedule(&self, message: NetworkMessage) -> ZResult<()> { let transport = self.get_transport()?; - transport.schedule(message); + transport.schedule(message)?; Ok(()) } diff --git a/io/zenoh-transport/src/multicast/tx.rs b/io/zenoh-transport/src/multicast/tx.rs index 775131703a..dec3f4a8a9 100644 --- a/io/zenoh-transport/src/multicast/tx.rs +++ b/io/zenoh-transport/src/multicast/tx.rs @@ -13,6 +13,7 @@ // use zenoh_core::zread; use zenoh_protocol::network::NetworkMessage; +use zenoh_result::ZResult; use super::transport::TransportMulticastInner; #[cfg(feature = "shared-memory")] @@ -20,7 +21,7 @@ use crate::shm::map_zmsg_to_partner; //noinspection ALL impl TransportMulticastInner { - fn schedule_on_link(&self, msg: NetworkMessage) -> bool { + fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult { macro_rules! zpush { ($guard:expr, $pipeline:expr, $msg:expr) => { // Drop the guard before the push_zenoh_message since @@ -28,7 +29,7 @@ impl TransportMulticastInner { // block for fairly long time let pl = $pipeline.clone(); drop($guard); - return pl.push_network_message($msg); + return Ok(pl.push_network_message($msg)?); }; } @@ -47,22 +48,22 @@ impl TransportMulticastInner { } } - false + Ok(false) } #[allow(unused_mut)] // When feature "shared-memory" is not enabled #[allow(clippy::let_and_return)] // When feature "stats" is not enabled #[inline(always)] - pub(super) fn schedule(&self, mut msg: NetworkMessage) -> bool { + pub(super) fn schedule(&self, mut msg: NetworkMessage) -> ZResult { #[cfg(feature = "shared-memory")] { if let Err(e) = map_zmsg_to_partner(&mut msg, &self.shm) { tracing::trace!("Failed SHM conversion: {}", e); - return false; + return Ok(false); } } - let res = self.schedule_on_link(msg); + let res = self.schedule_on_link(msg)?; #[cfg(feature = "stats")] if res { @@ -71,6 +72,6 @@ impl TransportMulticastInner { self.stats.inc_tx_n_dropped(1); } - res + Ok(res) } } diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index b95b68d606..a42a902164 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -398,10 +398,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { /* TX */ /*************************************/ fn schedule(&self, msg: NetworkMessage) -> ZResult<()> { - match self.internal_schedule(msg) { - true => Ok(()), - false => bail!("error scheduling message!"), - } + self.internal_schedule(msg).map(|_| ()) } fn add_debug_fields<'a, 'b: 'a, 'c>( diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index 6f61fc070d..9442bdd1c5 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -16,6 +16,7 @@ use zenoh_protocol::{ network::NetworkMessage, transport::close, }; +use zenoh_result::ZResult; use super::transport::TransportUnicastUniversal; #[cfg(feature = "shared-memory")] @@ -68,7 +69,7 @@ impl TransportUnicastUniversal { match_.full.or(match_.partial).or(match_.any) } - fn schedule_on_link(&self, msg: NetworkMessage) -> bool { + fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult { let transport_links = self .links .read() @@ -93,7 +94,7 @@ impl TransportUnicastUniversal { ); // No Link found - return false; + return Ok(false); }; let transport_link = transport_links @@ -112,7 +113,7 @@ impl TransportUnicastUniversal { // block for fairly long time drop(transport_links); let droppable = msg.is_droppable(); - let push = pipeline.push_network_message(msg); + let push = pipeline.push_network_message(msg)?; if !push && !droppable { tracing::error!( "Unable to push non droppable network message to {}. Closing transport!", @@ -131,22 +132,22 @@ impl TransportUnicastUniversal { } }); } - push + Ok(push) } #[allow(unused_mut)] // When feature "shared-memory" is not enabled #[allow(clippy::let_and_return)] // When feature "stats" is not enabled #[inline(always)] - pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> bool { + pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> ZResult { #[cfg(feature = "shared-memory")] { if let Err(e) = map_zmsg_to_partner(&mut msg, &self.config.shm) { tracing::trace!("Failed SHM conversion: {}", e); - return false; + return Ok(false); } } - let res = self.schedule_on_link(msg); + let res = self.schedule_on_link(msg)?; #[cfg(feature = "stats")] if res { @@ -155,7 +156,7 @@ impl TransportUnicastUniversal { self.stats.inc_tx_n_dropped(1); } - res + Ok(res) } }