diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 7a3e61463..8d140ee9b 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -412,6 +412,8 @@ drop: { /// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available. wait_before_drop: 1000, + /// The maximum deadline limit for multi-fragment messages. + max_wait_before_drop_fragments: 50000, }, /// Behavior pushing CongestionControl::Block messages to the queue. block: { diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 78fe11291..edf2c46c7 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -253,6 +253,7 @@ impl Default for CongestionControlDropConf { fn default() -> Self { Self { wait_before_drop: 1000, + max_wait_before_drop_fragments: 50000, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index d25ccc63c..51c851b3a 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -445,6 +445,8 @@ validated_struct::validator! { /// The maximum time in microseconds to wait for an available batch before dropping a droppable message /// if still no batch is available. wait_before_drop: i64, + /// The maximum deadline limit for multi-fragment messages. + max_wait_before_drop_fragments: i64, }, /// Behavior pushing CongestionControl::Block messages to the queue. pub block: CongestionControlBlockConf { diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 1ed4ef266..0dbded920 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -128,19 +128,53 @@ impl StageInMutex { } } +#[derive(Debug)] +struct WaitTime { + wait_time: Duration, + max_wait_time: Option, +} + +impl WaitTime { + fn new(wait_time: Duration, max_wait_time: Option) -> Self { + Self { + wait_time, + max_wait_time, + } + } + + fn advance(&mut self, instant: &mut Instant) { + match &mut self.max_wait_time { + Some(max_wait_time) => { + if let Some(new_max_wait_time) = max_wait_time.checked_sub(self.wait_time) { + *instant += self.wait_time; + *max_wait_time = new_max_wait_time; + self.wait_time *= 2; + } + } + None => { + *instant += self.wait_time; + } + } + } + + fn wait_time(&self) -> Duration { + self.wait_time + } +} + +#[derive(Clone)] enum DeadlineSetting { Immediate, - Infinite, Finite(Instant), } struct LazyDeadline { deadline: Option, - wait_time: Option, + wait_time: WaitTime, } impl LazyDeadline { - fn new(wait_time: Option) -> Self { + fn new(wait_time: WaitTime) -> Self { Self { deadline: None, wait_time, @@ -148,25 +182,22 @@ impl LazyDeadline { } fn advance(&mut self) { - let wait_time = self.wait_time; - match &mut self.deadline() { + match self.deadline().to_owned() { DeadlineSetting::Immediate => {} - DeadlineSetting::Infinite => {} - DeadlineSetting::Finite(instant) => { - *instant = instant.add(unsafe { wait_time.unwrap_unchecked() }); + DeadlineSetting::Finite(mut instant) => { + self.wait_time.advance(&mut instant); + self.deadline = Some(DeadlineSetting::Finite(instant)); } } } #[inline] fn deadline(&mut self) -> &mut DeadlineSetting { - self.deadline.get_or_insert_with(|| match self.wait_time { - Some(wait_time) => match wait_time.is_zero() { - true => DeadlineSetting::Immediate, - false => DeadlineSetting::Finite(Instant::now().add(wait_time)), - }, - None => DeadlineSetting::Infinite, - }) + self.deadline + .get_or_insert_with(|| match self.wait_time.wait_time() { + Duration::ZERO => DeadlineSetting::Immediate, + nonzero_wait_time => DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)), + }) } } @@ -175,9 +206,9 @@ struct Deadline { } impl Deadline { - fn new(wait_time: Option) -> Self { + fn new(wait_time: Duration, max_wait_time: Option) -> Self { Self { - lazy_deadline: LazyDeadline::new(wait_time), + lazy_deadline: LazyDeadline::new(WaitTime::new(wait_time, max_wait_time)), } } @@ -185,7 +216,6 @@ impl Deadline { fn wait(&mut self, s_ref: &StageInRefill) -> bool { match self.lazy_deadline.deadline() { DeadlineSetting::Immediate => false, - DeadlineSetting::Infinite => s_ref.wait(), DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant), } } @@ -577,7 +607,7 @@ impl StageOut { pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], - pub(crate) wait_before_drop: Duration, + pub(crate) wait_before_drop: (Duration, Duration), pub(crate) wait_before_close: Duration, pub(crate) batching_enabled: bool, pub(crate) batching_time_limit: Duration, @@ -680,7 +710,7 @@ pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, active: Arc, - wait_before_drop: Duration, + wait_before_drop: (Duration, Duration), wait_before_close: Duration, } @@ -695,12 +725,12 @@ impl TransmissionPipelineProducer { (0, Priority::DEFAULT) }; // If message is droppable, compute a deadline after which the sample could be dropped - let wait_time = if msg.is_droppable() { - self.wait_before_drop + let (wait_time, max_wait_time) = if msg.is_droppable() { + (self.wait_before_drop.0, Some(self.wait_before_drop.1)) } else { - self.wait_before_close + (self.wait_before_close, None) }; - let mut deadline = Deadline::new(Some(wait_time)); + let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); queue.push_network_message(&mut msg, priority, &mut deadline) @@ -856,7 +886,7 @@ mod tests { }, queue_size: [1; Priority::NUM], batching_enabled: true, - wait_before_drop: Duration::from_millis(1), + wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), }; @@ -870,7 +900,7 @@ mod tests { }, queue_size: [1; Priority::NUM], batching_enabled: true, - wait_before_drop: Duration::from_millis(1), + wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), }; diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 3045e8745..d0cf2a093 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -109,7 +109,7 @@ pub struct TransportManagerConfig { pub resolution: Resolution, pub batch_size: BatchSize, pub batching: bool, - pub wait_before_drop: Duration, + pub wait_before_drop: (Duration, Duration), pub wait_before_close: Duration, pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, @@ -141,7 +141,7 @@ pub struct TransportManagerBuilder { batch_size: BatchSize, batching_enabled: bool, batching_time_limit: Duration, - wait_before_drop: Duration, + wait_before_drop: (Duration, Duration), wait_before_close: Duration, queue_size: QueueSizeConf, defrag_buff_size: usize, @@ -192,7 +192,7 @@ impl TransportManagerBuilder { self } - pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self { + pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self { self.wait_before_drop = wait_before_drop; self } @@ -249,6 +249,8 @@ impl TransportManagerBuilder { } let link = config.transport().link(); + let cc_drop = link.tx().queue().congestion_control().drop(); + let cc_block = link.tx().queue().congestion_control().block(); let mut resolution = Resolution::default(); resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution()); self = self.resolution(resolution); @@ -259,22 +261,11 @@ impl TransportManagerBuilder { )); self = self.defrag_buff_size(*link.rx().max_message_size()); self = self.link_rx_buffer_size(*link.rx().buffer_size()); - self = self.wait_before_drop(duration_from_i64us( - *link - .tx() - .queue() - .congestion_control() - .drop() - .wait_before_drop(), - )); - self = self.wait_before_close(duration_from_i64us( - *link - .tx() - .queue() - .congestion_control() - .block() - .wait_before_close(), + self = self.wait_before_drop(( + duration_from_i64us(*cc_drop.wait_before_drop()), + duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), )); + self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close())); self = self.queue_size(link.tx().queue().size().clone()); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -372,8 +363,8 @@ impl Default for TransportManagerBuilder { let link_rx = LinkRxConf::default(); let queue = QueueConf::default(); let backoff = *queue.batching().time_limit(); - let wait_before_drop = *queue.congestion_control().drop().wait_before_drop(); - let wait_before_close = *queue.congestion_control().block().wait_before_close(); + let cc_drop = queue.congestion_control().drop(); + let cc_block = queue.congestion_control().block(); Self { version: VERSION, zid: ZenohIdProto::rand(), @@ -381,8 +372,11 @@ impl Default for TransportManagerBuilder { resolution: Resolution::default(), batch_size: BatchSize::MAX, batching_enabled: true, - wait_before_drop: duration_from_i64us(wait_before_drop), - wait_before_close: duration_from_i64us(wait_before_close), + wait_before_drop: ( + duration_from_i64us(*cc_drop.wait_before_drop()), + duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), + ), + wait_before_close: duration_from_i64us(*cc_block.wait_before_close()), queue_size: queue.size, batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(),