Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement non-linear wait before drop(close) #1603

Merged
merged 8 commits into from
Nov 28, 2024
4 changes: 4 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,16 @@
drop: {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: 255000,
Mallets marked this conversation as resolved.
Show resolved Hide resolved
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
wait_before_close: 5000000,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_close_fragments: 15000000,
Mallets marked this conversation as resolved.
Show resolved Hide resolved
},
},
/// Perform batching of messages if they are smaller of the batch_size
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl Default for CongestionControlDropConf {
fn default() -> Self {
Self {
wait_before_drop: 1000,
max_wait_before_drop_fragments: 255000,
}
}
}
Expand All @@ -261,6 +262,7 @@ impl Default for CongestionControlBlockConf {
fn default() -> Self {
Self {
wait_before_close: 5000000,
max_wait_before_close_fragments: 15000000,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,16 @@ validated_struct::validator! {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message
/// if still no batch is available.
wait_before_drop: i64,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: i64,
Mallets marked this conversation as resolved.
Show resolved Hide resolved
},
/// Behavior pushing CongestionControl::Block messages to the queue.
pub block: CongestionControlBlockConf {
/// The maximum time in microseconds to wait for an available batch before closing the transport session
/// when sending a blocking message if still no batch is available.
wait_before_close: i64,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_close_fragments: i64,
},
},
pub batching: BatchingConf {
Expand Down
83 changes: 60 additions & 23 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,34 @@ impl StageInMutex {
}
}

#[derive(Debug)]
struct WaitTime {
wait_time: Duration,
max_wait_time: Duration,
}

impl WaitTime {
fn new(wait_time: Duration, max_wait_time: Duration) -> Self {
Self {
wait_time,
max_wait_time,
}
}

fn advance(&mut self, instant: &mut Instant) {
if let Some(new_max_wait_time) = self.max_wait_time.checked_sub(self.wait_time) {
*instant += self.wait_time;
self.max_wait_time = new_max_wait_time;
self.wait_time *= 2;
}
}

fn wait_time(&self) -> Duration {
self.wait_time
}
}

#[derive(Clone)]
enum DeadlineSetting {
Immediate,
Infinite,
Expand All @@ -136,37 +164,44 @@ enum DeadlineSetting {

struct LazyDeadline {
deadline: Option<DeadlineSetting>,
wait_time: Option<Duration>,
wait_time: Option<WaitTime>,
}

impl LazyDeadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: Option<WaitTime>) -> Self {
Self {
deadline: None,
wait_time,
}
}

fn advance(&mut self) {
let wait_time = self.wait_time;
match &mut self.deadline() {
match self.deadline().to_owned() {
DeadlineSetting::Immediate => {}
DeadlineSetting::Infinite => {}
DeadlineSetting::Finite(instant) => {
*instant = instant.add(unsafe { wait_time.unwrap_unchecked() });
DeadlineSetting::Finite(mut instant) => {
// SAFETY: this is safe because DeadlineSetting::Finite is returned by
// deadline() only if wait_time is Some(_)
let wait_time = unsafe { self.wait_time.as_mut().unwrap_unchecked() };
wait_time.advance(&mut instant);

self.deadline = Some(DeadlineSetting::Finite(instant));
}
}
}

#[inline]
fn deadline(&mut self) -> &mut DeadlineSetting {
self.deadline.get_or_insert_with(|| match self.wait_time {
Some(wait_time) => match wait_time.is_zero() {
true => DeadlineSetting::Immediate,
false => DeadlineSetting::Finite(Instant::now().add(wait_time)),
},
None => DeadlineSetting::Infinite,
})
self.deadline
.get_or_insert_with(|| match self.wait_time.as_mut() {
Some(wait_time) => match wait_time.wait_time() {
Duration::ZERO => DeadlineSetting::Immediate,
nonzero_wait_time => {
DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time))
}
},
None => DeadlineSetting::Infinite,
})
}
}

Expand All @@ -175,9 +210,11 @@ struct Deadline {
}

impl Deadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: Option<(Duration, Duration)>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(wait_time),
lazy_deadline: LazyDeadline::new(
wait_time.map(|(wait_time, max_wait_time)| WaitTime::new(wait_time, max_wait_time)),
),
}
}

Expand Down Expand Up @@ -577,8 +614,8 @@ impl StageOut {
pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: Duration,
pub(crate) wait_before_close: Duration,
pub(crate) wait_before_drop: (Duration, Duration),
pub(crate) wait_before_close: (Duration, Duration),
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
}
Expand Down Expand Up @@ -680,8 +717,8 @@ pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: Duration,
wait_before_close: Duration,
wait_before_drop: (Duration, Duration),
wait_before_close: (Duration, Duration),
}

impl TransmissionPipelineProducer {
Expand Down Expand Up @@ -856,8 +893,8 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_close: Duration::from_secs(5),
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)),
batching_time_limit: Duration::from_micros(1),
};

Expand All @@ -870,8 +907,8 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_close: Duration::from_secs(5),
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)),
batching_time_limit: Duration::from_micros(1),
};

Expand Down
48 changes: 24 additions & 24 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ pub struct TransportManagerConfig {
pub resolution: Resolution,
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: Duration,
pub wait_before_close: Duration,
pub wait_before_drop: (Duration, Duration),
pub wait_before_close: (Duration, Duration),
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub defrag_buff_size: usize,
Expand Down Expand Up @@ -141,8 +141,8 @@ pub struct TransportManagerBuilder {
batch_size: BatchSize,
batching_enabled: bool,
batching_time_limit: Duration,
wait_before_drop: Duration,
wait_before_close: Duration,
wait_before_drop: (Duration, Duration),
wait_before_close: (Duration, Duration),
queue_size: QueueSizeConf,
defrag_buff_size: usize,
link_rx_buffer_size: usize,
Expand Down Expand Up @@ -192,12 +192,12 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self {
pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self {
self.wait_before_drop = wait_before_drop;
self
}

pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self {
pub fn wait_before_close(mut self, wait_before_close: (Duration, Duration)) -> Self {
self.wait_before_close = wait_before_close;
self
}
Expand Down Expand Up @@ -249,6 +249,8 @@ impl TransportManagerBuilder {
}

let link = config.transport().link();
let cc_drop = link.tx().queue().congestion_control().drop();
let cc_block = link.tx().queue().congestion_control().block();
let mut resolution = Resolution::default();
resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution());
self = self.resolution(resolution);
Expand All @@ -259,21 +261,13 @@ impl TransportManagerBuilder {
));
self = self.defrag_buff_size(*link.rx().max_message_size());
self = self.link_rx_buffer_size(*link.rx().buffer_size());
self = self.wait_before_drop(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.drop()
.wait_before_drop(),
self = self.wait_before_drop((
duration_from_i64us(*cc_drop.wait_before_drop()),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
));
self = self.wait_before_close(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.block()
.wait_before_close(),
self = self.wait_before_close((
duration_from_i64us(*cc_block.wait_before_close()),
duration_from_i64us(*cc_block.max_wait_before_close_fragments()),
));
self = self.queue_size(link.tx().queue().size().clone());
self = self.tx_threads(*link.tx().threads());
Expand Down Expand Up @@ -372,17 +366,23 @@ impl Default for TransportManagerBuilder {
let link_rx = LinkRxConf::default();
let queue = QueueConf::default();
let backoff = *queue.batching().time_limit();
let wait_before_drop = *queue.congestion_control().drop().wait_before_drop();
let wait_before_close = *queue.congestion_control().block().wait_before_close();
let cc_drop = queue.congestion_control().drop();
let cc_block = queue.congestion_control().block();
Self {
version: VERSION,
zid: ZenohIdProto::rand(),
whatami: zenoh_config::defaults::mode,
resolution: Resolution::default(),
batch_size: BatchSize::MAX,
batching_enabled: true,
wait_before_drop: duration_from_i64us(wait_before_drop),
wait_before_close: duration_from_i64us(wait_before_close),
wait_before_drop: (
duration_from_i64us(*cc_drop.wait_before_drop()),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
),
wait_before_close: (
duration_from_i64us(*cc_block.wait_before_close()),
duration_from_i64us(*cc_block.max_wait_before_close_fragments()),
),
queue_size: queue.size,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
Expand Down