Skip to content

Commit

Permalink
Merge pull request #1603 from ZettaScaleLabs/non_linear_cc_wait
Browse files Browse the repository at this point in the history
implement non-linear wait before drop(close)
  • Loading branch information
Mallets authored Nov 28, 2024
2 parents 075f2ad + 70ff070 commit 7e044ad
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 48 deletions.
2 changes: 2 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@
drop: {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: 50000,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl Default for CongestionControlDropConf {
fn default() -> Self {
Self {
wait_before_drop: 1000,
max_wait_before_drop_fragments: 50000,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ validated_struct::validator! {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message
/// if still no batch is available.
wait_before_drop: i64,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: i64,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
pub block: CongestionControlBlockConf {
Expand Down
82 changes: 56 additions & 26 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,45 +128,76 @@ impl StageInMutex {
}
}

#[derive(Debug)]
struct WaitTime {
wait_time: Duration,
max_wait_time: Option<Duration>,
}

impl WaitTime {
fn new(wait_time: Duration, max_wait_time: Option<Duration>) -> Self {
Self {
wait_time,
max_wait_time,
}
}

fn advance(&mut self, instant: &mut Instant) {
match &mut self.max_wait_time {
Some(max_wait_time) => {
if let Some(new_max_wait_time) = max_wait_time.checked_sub(self.wait_time) {
*instant += self.wait_time;
*max_wait_time = new_max_wait_time;
self.wait_time *= 2;
}
}
None => {
*instant += self.wait_time;
}
}
}

fn wait_time(&self) -> Duration {
self.wait_time
}
}

#[derive(Clone)]
enum DeadlineSetting {
Immediate,
Infinite,
Finite(Instant),
}

struct LazyDeadline {
deadline: Option<DeadlineSetting>,
wait_time: Option<Duration>,
wait_time: WaitTime,
}

impl LazyDeadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: WaitTime) -> Self {
Self {
deadline: None,
wait_time,
}
}

fn advance(&mut self) {
let wait_time = self.wait_time;
match &mut self.deadline() {
match self.deadline().to_owned() {
DeadlineSetting::Immediate => {}
DeadlineSetting::Infinite => {}
DeadlineSetting::Finite(instant) => {
*instant = instant.add(unsafe { wait_time.unwrap_unchecked() });
DeadlineSetting::Finite(mut instant) => {
self.wait_time.advance(&mut instant);
self.deadline = Some(DeadlineSetting::Finite(instant));
}
}
}

#[inline]
fn deadline(&mut self) -> &mut DeadlineSetting {
self.deadline.get_or_insert_with(|| match self.wait_time {
Some(wait_time) => match wait_time.is_zero() {
true => DeadlineSetting::Immediate,
false => DeadlineSetting::Finite(Instant::now().add(wait_time)),
},
None => DeadlineSetting::Infinite,
})
self.deadline
.get_or_insert_with(|| match self.wait_time.wait_time() {
Duration::ZERO => DeadlineSetting::Immediate,
nonzero_wait_time => DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)),
})
}
}

Expand All @@ -175,17 +206,16 @@ struct Deadline {
}

impl Deadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: Duration, max_wait_time: Option<Duration>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(wait_time),
lazy_deadline: LazyDeadline::new(WaitTime::new(wait_time, max_wait_time)),
}
}

#[inline]
fn wait(&mut self, s_ref: &StageInRefill) -> bool {
match self.lazy_deadline.deadline() {
DeadlineSetting::Immediate => false,
DeadlineSetting::Infinite => s_ref.wait(),
DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant),
}
}
Expand Down Expand Up @@ -577,7 +607,7 @@ impl StageOut {
pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: Duration,
pub(crate) wait_before_drop: (Duration, Duration),
pub(crate) wait_before_close: Duration,
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
Expand Down Expand Up @@ -680,7 +710,7 @@ pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: Duration,
wait_before_drop: (Duration, Duration),
wait_before_close: Duration,
}

Expand All @@ -695,12 +725,12 @@ impl TransmissionPipelineProducer {
(0, Priority::DEFAULT)
};
// If message is droppable, compute a deadline after which the sample could be dropped
let wait_time = if msg.is_droppable() {
self.wait_before_drop
let (wait_time, max_wait_time) = if msg.is_droppable() {
(self.wait_before_drop.0, Some(self.wait_before_drop.1))
} else {
self.wait_before_close
(self.wait_before_close, None)
};
let mut deadline = Deadline::new(Some(wait_time));
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, &mut deadline)
Expand Down Expand Up @@ -856,7 +886,7 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
};
Expand All @@ -870,7 +900,7 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
};
Expand Down
38 changes: 16 additions & 22 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub struct TransportManagerConfig {
pub resolution: Resolution,
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: Duration,
pub wait_before_drop: (Duration, Duration),
pub wait_before_close: Duration,
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
Expand Down Expand Up @@ -141,7 +141,7 @@ pub struct TransportManagerBuilder {
batch_size: BatchSize,
batching_enabled: bool,
batching_time_limit: Duration,
wait_before_drop: Duration,
wait_before_drop: (Duration, Duration),
wait_before_close: Duration,
queue_size: QueueSizeConf,
defrag_buff_size: usize,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self {
pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self {
self.wait_before_drop = wait_before_drop;
self
}
Expand Down Expand Up @@ -249,6 +249,8 @@ impl TransportManagerBuilder {
}

let link = config.transport().link();
let cc_drop = link.tx().queue().congestion_control().drop();
let cc_block = link.tx().queue().congestion_control().block();
let mut resolution = Resolution::default();
resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution());
self = self.resolution(resolution);
Expand All @@ -259,22 +261,11 @@ impl TransportManagerBuilder {
));
self = self.defrag_buff_size(*link.rx().max_message_size());
self = self.link_rx_buffer_size(*link.rx().buffer_size());
self = self.wait_before_drop(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.drop()
.wait_before_drop(),
));
self = self.wait_before_close(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.block()
.wait_before_close(),
self = self.wait_before_drop((
duration_from_i64us(*cc_drop.wait_before_drop()),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
));
self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close()));
self = self.queue_size(link.tx().queue().size().clone());
self = self.tx_threads(*link.tx().threads());
self = self.protocols(link.protocols().clone());
Expand Down Expand Up @@ -372,17 +363,20 @@ impl Default for TransportManagerBuilder {
let link_rx = LinkRxConf::default();
let queue = QueueConf::default();
let backoff = *queue.batching().time_limit();
let wait_before_drop = *queue.congestion_control().drop().wait_before_drop();
let wait_before_close = *queue.congestion_control().block().wait_before_close();
let cc_drop = queue.congestion_control().drop();
let cc_block = queue.congestion_control().block();
Self {
version: VERSION,
zid: ZenohIdProto::rand(),
whatami: zenoh_config::defaults::mode,
resolution: Resolution::default(),
batch_size: BatchSize::MAX,
batching_enabled: true,
wait_before_drop: duration_from_i64us(wait_before_drop),
wait_before_close: duration_from_i64us(wait_before_close),
wait_before_drop: (
duration_from_i64us(*cc_drop.wait_before_drop()),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
),
wait_before_close: duration_from_i64us(*cc_block.wait_before_close()),
queue_size: queue.size,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
Expand Down

0 comments on commit 7e044ad

Please sign in to comment.