Skip to content

Commit

Permalink
get rid of non-linear wait functionality for CC::Block
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Nov 28, 2024
1 parent 8e24a88 commit 70ff070
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 52 deletions.
2 changes: 0 additions & 2 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,6 @@
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
wait_before_close: 5000000,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_close_fragments: 15000000,
},
},
/// Perform batching of messages if they are smaller of the batch_size
Expand Down
1 change: 0 additions & 1 deletion commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ impl Default for CongestionControlBlockConf {
fn default() -> Self {
Self {
wait_before_close: 5000000,
max_wait_before_close_fragments: 15000000,
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,6 @@ validated_struct::validator! {
/// The maximum time in microseconds to wait for an available batch before closing the transport session
/// when sending a blocking message if still no batch is available.
wait_before_close: i64,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_close_fragments: i64,
},
},
pub batching: BatchingConf {
Expand Down
65 changes: 29 additions & 36 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,29 @@ impl StageInMutex {
#[derive(Debug)]
struct WaitTime {
wait_time: Duration,
max_wait_time: Duration,
max_wait_time: Option<Duration>,
}

impl WaitTime {
fn new(wait_time: Duration, max_wait_time: Duration) -> Self {
fn new(wait_time: Duration, max_wait_time: Option<Duration>) -> Self {
Self {
wait_time,
max_wait_time,
}
}

fn advance(&mut self, instant: &mut Instant) {
if let Some(new_max_wait_time) = self.max_wait_time.checked_sub(self.wait_time) {
*instant += self.wait_time;
self.max_wait_time = new_max_wait_time;
self.wait_time *= 2;
match &mut self.max_wait_time {
Some(max_wait_time) => {
if let Some(new_max_wait_time) = max_wait_time.checked_sub(self.wait_time) {
*instant += self.wait_time;
*max_wait_time = new_max_wait_time;
self.wait_time *= 2;
}
}
None => {
*instant += self.wait_time;
}
}
}

Expand All @@ -158,17 +165,16 @@ impl WaitTime {
#[derive(Clone)]
enum DeadlineSetting {
Immediate,
Infinite,
Finite(Instant),
}

struct LazyDeadline {
deadline: Option<DeadlineSetting>,
wait_time: Option<WaitTime>,
wait_time: WaitTime,
}

impl LazyDeadline {
fn new(wait_time: Option<WaitTime>) -> Self {
fn new(wait_time: WaitTime) -> Self {
Self {
deadline: None,
wait_time,
Expand All @@ -178,13 +184,8 @@ impl LazyDeadline {
fn advance(&mut self) {
match self.deadline().to_owned() {
DeadlineSetting::Immediate => {}
DeadlineSetting::Infinite => {}
DeadlineSetting::Finite(mut instant) => {
// SAFETY: this is safe because DeadlineSetting::Finite is returned by
// deadline() only if wait_time is Some(_)
let wait_time = unsafe { self.wait_time.as_mut().unwrap_unchecked() };
wait_time.advance(&mut instant);

self.wait_time.advance(&mut instant);
self.deadline = Some(DeadlineSetting::Finite(instant));
}
}
Expand All @@ -193,14 +194,9 @@ impl LazyDeadline {
#[inline]
fn deadline(&mut self) -> &mut DeadlineSetting {
self.deadline
.get_or_insert_with(|| match self.wait_time.as_mut() {
Some(wait_time) => match wait_time.wait_time() {
Duration::ZERO => DeadlineSetting::Immediate,
nonzero_wait_time => {
DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time))
}
},
None => DeadlineSetting::Infinite,
.get_or_insert_with(|| match self.wait_time.wait_time() {
Duration::ZERO => DeadlineSetting::Immediate,
nonzero_wait_time => DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)),
})
}
}
Expand All @@ -210,19 +206,16 @@ struct Deadline {
}

impl Deadline {
fn new(wait_time: Option<(Duration, Duration)>) -> Self {
fn new(wait_time: Duration, max_wait_time: Option<Duration>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(
wait_time.map(|(wait_time, max_wait_time)| WaitTime::new(wait_time, max_wait_time)),
),
lazy_deadline: LazyDeadline::new(WaitTime::new(wait_time, max_wait_time)),
}
}

#[inline]
fn wait(&mut self, s_ref: &StageInRefill) -> bool {
match self.lazy_deadline.deadline() {
DeadlineSetting::Immediate => false,
DeadlineSetting::Infinite => s_ref.wait(),
DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant),
}
}
Expand Down Expand Up @@ -615,7 +608,7 @@ pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: (Duration, Duration),
pub(crate) wait_before_close: (Duration, Duration),
pub(crate) wait_before_close: Duration,
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
}
Expand Down Expand Up @@ -718,7 +711,7 @@ pub(crate) struct TransmissionPipelineProducer {
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: (Duration, Duration),
wait_before_close: (Duration, Duration),
wait_before_close: Duration,
}

impl TransmissionPipelineProducer {
Expand All @@ -732,12 +725,12 @@ impl TransmissionPipelineProducer {
(0, Priority::DEFAULT)
};
// If message is droppable, compute a deadline after which the sample could be dropped
let wait_time = if msg.is_droppable() {
self.wait_before_drop
let (wait_time, max_wait_time) = if msg.is_droppable() {
(self.wait_before_drop.0, Some(self.wait_before_drop.1))
} else {
self.wait_before_close
(self.wait_before_close, None)
};
let mut deadline = Deadline::new(Some(wait_time));
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, &mut deadline)
Expand Down Expand Up @@ -894,7 +887,7 @@ mod tests {
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
};

Expand All @@ -908,7 +901,7 @@ mod tests {
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
};

Expand Down
16 changes: 5 additions & 11 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub struct TransportManagerConfig {
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: (Duration, Duration),
pub wait_before_close: (Duration, Duration),
pub wait_before_close: Duration,
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub defrag_buff_size: usize,
Expand Down Expand Up @@ -142,7 +142,7 @@ pub struct TransportManagerBuilder {
batching_enabled: bool,
batching_time_limit: Duration,
wait_before_drop: (Duration, Duration),
wait_before_close: (Duration, Duration),
wait_before_close: Duration,
queue_size: QueueSizeConf,
defrag_buff_size: usize,
link_rx_buffer_size: usize,
Expand Down Expand Up @@ -197,7 +197,7 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_close(mut self, wait_before_close: (Duration, Duration)) -> Self {
pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self {
self.wait_before_close = wait_before_close;
self
}
Expand Down Expand Up @@ -265,10 +265,7 @@ impl TransportManagerBuilder {
duration_from_i64us(*cc_drop.wait_before_drop()),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
));
self = self.wait_before_close((
duration_from_i64us(*cc_block.wait_before_close()),
duration_from_i64us(*cc_block.max_wait_before_close_fragments()),
));
self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close()));
self = self.queue_size(link.tx().queue().size().clone());
self = self.tx_threads(*link.tx().threads());
self = self.protocols(link.protocols().clone());
Expand Down Expand Up @@ -379,10 +376,7 @@ impl Default for TransportManagerBuilder {
duration_from_i64us(*cc_drop.wait_before_drop()),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
),
wait_before_close: (
duration_from_i64us(*cc_block.wait_before_close()),
duration_from_i64us(*cc_block.max_wait_before_close_fragments()),
),
wait_before_close: duration_from_i64us(*cc_block.wait_before_close()),
queue_size: queue.size,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
Expand Down

0 comments on commit 70ff070

Please sign in to comment.