Skip to content

Commit

Permalink
fix: don't emit an error log if pipeline is closed (#1658)
Browse files Browse the repository at this point in the history
* fix: don't emit an error log if pipeline is closed

Before, there was no distinction between the pipeline dropping a
message because its deadline was reached or because the transport was
closed.
As a consequence, blocking message which was dropped because of
closed transport would still emit an error log. This PR introduce
the distinction between the two dropping causes.

* fix: fix dumb merge resolution

* fix: removed half-backed test

* test: add test

* fix: lint

* fix: lint

* fix: fix test

* fix: fix test

* fix: lint
  • Loading branch information
wyfo authored Dec 11, 2024
1 parent 911382d commit df86f75
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 40 deletions.
93 changes: 73 additions & 20 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
fmt,
ops::Add,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
Expand Down Expand Up @@ -40,7 +41,7 @@ use zenoh_protocol::{
AtomicBatchSize, BatchSize, TransportMessage,
},
};
use zenoh_sync::{event, Notifier, Waiter};
use zenoh_sync::{event, Notifier, WaitDeadlineError, Waiter};

use super::{
batch::{Encode, WBatch},
Expand All @@ -56,6 +57,15 @@ struct StageInRefill {
s_ref_r: RingBufferReader<WBatch, RBLEN>,
}

#[derive(Debug)]
pub(crate) struct TransportClosed;
impl fmt::Display for TransportClosed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "transport closed")
}
}
impl std::error::Error for TransportClosed {}

impl StageInRefill {
fn pull(&mut self) -> Option<WBatch> {
self.s_ref_r.pull()
Expand All @@ -65,8 +75,12 @@ impl StageInRefill {
self.n_ref_r.wait().is_ok()
}

fn wait_deadline(&self, instant: Instant) -> bool {
self.n_ref_r.wait_deadline(instant).is_ok()
fn wait_deadline(&self, instant: Instant) -> Result<bool, TransportClosed> {
match self.n_ref_r.wait_deadline(instant) {
Ok(()) => Ok(true),
Err(WaitDeadlineError::Deadline) => Ok(false),
Err(WaitDeadlineError::WaitError) => Err(TransportClosed),
}
}
}

Expand Down Expand Up @@ -214,9 +228,9 @@ impl Deadline {
}

#[inline]
fn wait(&mut self, s_ref: &StageInRefill) -> bool {
fn wait(&mut self, s_ref: &StageInRefill) -> Result<bool, TransportClosed> {
match self.lazy_deadline.deadline() {
DeadlineSetting::Immediate => false,
DeadlineSetting::Immediate => Ok(false),
DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant),
}
}
Expand All @@ -243,7 +257,7 @@ impl StageIn {
msg: &mut NetworkMessage,
priority: Priority,
deadline: &mut Deadline,
) -> bool {
) -> Result<bool, TransportClosed> {
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();

Expand All @@ -264,15 +278,15 @@ impl StageIn {
None => {
drop(c_guard);
// Wait for an available batch until deadline
if !deadline.wait(&self.s_ref) {
if !deadline.wait(&self.s_ref)? {
// Still no available batch.
// Restore the sequence number and drop the message
$($restore_sn)?
tracing::trace!(
"Zenoh message dropped because it's over the deadline {:?}: {:?}",
deadline.lazy_deadline.wait_time, msg
);
return false;
return Ok(false);
}
c_guard = self.mutex.current();
}
Expand All @@ -287,13 +301,13 @@ impl StageIn {
if !self.batching || $msg.is_express() {
// Move out existing batch
self.s_out.move_batch($batch);
return true;
return Ok(true);
} else {
let bytes = $batch.len();
*c_guard = Some($batch);
drop(c_guard);
self.s_out.notify(bytes);
return true;
return Ok(true);
}
}};
}
Expand Down Expand Up @@ -404,7 +418,7 @@ impl StageIn {
// Clean the fragbuf
self.fragbuf.clear();

true
Ok(true)
}

#[inline]
Expand Down Expand Up @@ -767,7 +781,10 @@ pub(crate) struct TransmissionPipelineProducer {

impl TransmissionPipelineProducer {
#[inline]
pub(crate) fn push_network_message(&self, mut msg: NetworkMessage) -> bool {
pub(crate) fn push_network_message(
&self,
mut msg: NetworkMessage,
) -> Result<bool, TransportClosed> {
// If the queue is not QoS, it means that we only have one priority with index 0.
let (idx, priority) = if self.stage_in.len() > 1 {
let priority = msg.priority();
Expand All @@ -780,7 +797,7 @@ impl TransmissionPipelineProducer {
let (wait_time, max_wait_time) = if msg.is_droppable() {
// Checked if we are blocked on the priority queue and we drop directly the message
if self.status.is_congested(priority) {
return false;
return Ok(false);
}
(self.wait_before_drop.0, Some(self.wait_before_drop.1))
} else {
Expand All @@ -789,11 +806,11 @@ impl TransmissionPipelineProducer {
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
let sent = queue.push_network_message(&mut msg, priority, &mut deadline);
let sent = queue.push_network_message(&mut msg, priority, &mut deadline)?;
if !sent {
self.status.set_congested(priority, true);
}
sent
Ok(sent)
}

#[inline]
Expand Down Expand Up @@ -1002,7 +1019,7 @@ mod tests {
"Pipeline Flow [>>>]: Pushed {} msgs ({payload_size} bytes)",
i + 1
);
queue.push_network_message(message.clone());
queue.push_network_message(message.clone()).unwrap();
}
}

Expand Down Expand Up @@ -1129,7 +1146,7 @@ mod tests {
println!(
"Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes"
);
queue.push_network_message(message.clone());
queue.push_network_message(message.clone()).unwrap();
let c = counter.fetch_add(1, Ordering::AcqRel);
println!(
"Pipeline Blocking [>>>]: ({}) Scheduled message #{} (tot {}) with payload size of {} bytes",
Expand Down Expand Up @@ -1173,12 +1190,13 @@ mod tests {

timeout(TIMEOUT, check).await?;

// Disable and drain the queue
timeout(
// Drain the queue (but don't drop it to avoid dropping the messages)
let _consumer = timeout(
TIMEOUT,
task::spawn_blocking(move || {
println!("Pipeline Blocking [---]: draining the queue");
let _ = consumer.drain();
consumer
}),
)
.await??;
Expand Down Expand Up @@ -1242,7 +1260,7 @@ mod tests {
let duration = Duration::from_millis(5_500);
let start = Instant::now();
while start.elapsed() < duration {
producer.push_network_message(message.clone());
producer.push_network_message(message.clone()).unwrap();
}
}
}
Expand All @@ -1269,4 +1287,39 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tx_pipeline_closed() -> ZResult<()> {
// Pipeline
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?;
let priorities = vec![tct];
let (producer, consumer) =
TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice());
// Drop consumer to close the pipeline
drop(consumer);

let message: NetworkMessage = Push {
wire_expr: "test".into(),
ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, true),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: vec![42u8].into(),
}),
}
.into();
// First message should not be rejected as the is one batch available in the queue
assert!(producer.push_network_message(message.clone()).is_ok());
// Second message should be rejected
assert!(producer.push_network_message(message.clone()).is_err());

Ok(())
}
}
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/multicast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl TransportMulticast {
#[inline(always)]
pub fn schedule(&self, message: NetworkMessage) -> ZResult<()> {
let transport = self.get_transport()?;
transport.schedule(message);
transport.schedule(message)?;
Ok(())
}

Expand Down
15 changes: 8 additions & 7 deletions io/zenoh-transport/src/multicast/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,23 @@
//
use zenoh_core::zread;
use zenoh_protocol::network::NetworkMessage;
use zenoh_result::ZResult;

use super::transport::TransportMulticastInner;
#[cfg(feature = "shared-memory")]
use crate::shm::map_zmsg_to_partner;

//noinspection ALL
impl TransportMulticastInner {
fn schedule_on_link(&self, msg: NetworkMessage) -> bool {
fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult<bool> {
macro_rules! zpush {
($guard:expr, $pipeline:expr, $msg:expr) => {
// Drop the guard before the push_zenoh_message since
// the link could be congested and this operation could
// block for fairly long time
let pl = $pipeline.clone();
drop($guard);
return pl.push_network_message($msg);
return Ok(pl.push_network_message($msg)?);
};
}

Expand All @@ -47,22 +48,22 @@ impl TransportMulticastInner {
}
}

false
Ok(false)
}

#[allow(unused_mut)] // When feature "shared-memory" is not enabled
#[allow(clippy::let_and_return)] // When feature "stats" is not enabled
#[inline(always)]
pub(super) fn schedule(&self, mut msg: NetworkMessage) -> bool {
pub(super) fn schedule(&self, mut msg: NetworkMessage) -> ZResult<bool> {
#[cfg(feature = "shared-memory")]
{
if let Err(e) = map_zmsg_to_partner(&mut msg, &self.shm) {
tracing::trace!("Failed SHM conversion: {}", e);
return false;
return Ok(false);
}
}

let res = self.schedule_on_link(msg);
let res = self.schedule_on_link(msg)?;

#[cfg(feature = "stats")]
if res {
Expand All @@ -71,6 +72,6 @@ impl TransportMulticastInner {
self.stats.inc_tx_n_dropped(1);
}

res
Ok(res)
}
}
5 changes: 1 addition & 4 deletions io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
/* TX */
/*************************************/
fn schedule(&self, msg: NetworkMessage) -> ZResult<()> {
match self.internal_schedule(msg) {
true => Ok(()),
false => bail!("error scheduling message!"),
}
self.internal_schedule(msg).map(|_| ())
}

fn add_debug_fields<'a, 'b: 'a, 'c>(
Expand Down
17 changes: 9 additions & 8 deletions io/zenoh-transport/src/unicast/universal/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use zenoh_protocol::{
network::NetworkMessage,
transport::close,
};
use zenoh_result::ZResult;

use super::transport::TransportUnicastUniversal;
#[cfg(feature = "shared-memory")]
Expand Down Expand Up @@ -68,7 +69,7 @@ impl TransportUnicastUniversal {
match_.full.or(match_.partial).or(match_.any)
}

fn schedule_on_link(&self, msg: NetworkMessage) -> bool {
fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult<bool> {
let transport_links = self
.links
.read()
Expand All @@ -93,7 +94,7 @@ impl TransportUnicastUniversal {
);

// No Link found
return false;
return Ok(false);
};

let transport_link = transport_links
Expand All @@ -112,7 +113,7 @@ impl TransportUnicastUniversal {
// block for fairly long time
drop(transport_links);
let droppable = msg.is_droppable();
let push = pipeline.push_network_message(msg);
let push = pipeline.push_network_message(msg)?;
if !push && !droppable {
tracing::error!(
"Unable to push non droppable network message to {}. Closing transport!",
Expand All @@ -131,22 +132,22 @@ impl TransportUnicastUniversal {
}
});
}
push
Ok(push)
}

#[allow(unused_mut)] // When feature "shared-memory" is not enabled
#[allow(clippy::let_and_return)] // When feature "stats" is not enabled
#[inline(always)]
pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> bool {
pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> ZResult<bool> {
#[cfg(feature = "shared-memory")]
{
if let Err(e) = map_zmsg_to_partner(&mut msg, &self.config.shm) {
tracing::trace!("Failed SHM conversion: {}", e);
return false;
return Ok(false);
}
}

let res = self.schedule_on_link(msg);
let res = self.schedule_on_link(msg)?;

#[cfg(feature = "stats")]
if res {
Expand All @@ -155,7 +156,7 @@ impl TransportUnicastUniversal {
self.stats.inc_tx_n_dropped(1);
}

res
Ok(res)
}
}

Expand Down

0 comments on commit df86f75

Please sign in to comment.