Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement non-linear wait before drop(close) #1603

Merged
merged 8 commits into from
Nov 28, 2024
14 changes: 2 additions & 12 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -413,25 +413,15 @@
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_drop * 2 ^ ttl
ttl: 8,
max_wait_before_drop_fragments: 255000,
Mallets marked this conversation as resolved.
Show resolved Hide resolved
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
wait_before_close: 5000000,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_close * 2 ^ ttl
ttl: 2,
max_wait_before_close_fragments: 15000000,
Mallets marked this conversation as resolved.
Show resolved Hide resolved
},
},
/// Perform batching of messages if they are smaller of the batch_size
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Default for CongestionControlDropConf {
fn default() -> Self {
Self {
wait_before_drop: 1000,
ttl: 8,
max_wait_before_drop_fragments: 255000,
}
}
}
Expand All @@ -262,7 +262,7 @@ impl Default for CongestionControlBlockConf {
fn default() -> Self {
Self {
wait_before_close: 5000000,
ttl: 2,
max_wait_before_close_fragments: 15000000,
}
}
}
Expand Down
14 changes: 2 additions & 12 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,25 +446,15 @@ validated_struct::validator! {
/// if still no batch is available.
wait_before_drop: i64,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_drop * 2 ^ ttl
ttl: usize,
max_wait_before_drop_fragments: i64,
Mallets marked this conversation as resolved.
Show resolved Hide resolved
},
/// Behavior pushing CongestionControl::Block messages to the queue.
pub block: CongestionControlBlockConf {
/// The maximum time in microseconds to wait for an available batch before closing the transport session
/// when sending a blocking message if still no batch is available.
wait_before_close: i64,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_close * 2 ^ ttl
ttl: usize,
max_wait_before_close_fragments: i64,
},
},
pub batching: BatchingConf {
Expand Down
34 changes: 19 additions & 15 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,24 @@ impl StageInMutex {
}
}

#[derive(Debug)]
struct WaitTime {
wait_time: Duration,
ttl: usize,
max_wait_time: Duration,
}

impl WaitTime {
fn new(wait_time: Duration, ttl: usize) -> Self {
Self { wait_time, ttl }
fn new(wait_time: Duration, max_wait_time: Duration) -> Self {
Self {
wait_time,
max_wait_time,
}
}

fn advance(&mut self, instant: &mut Instant) {
if let Some(new_ttl) = self.ttl.checked_sub(1) {
if let Some(new_max_wait_time) = self.max_wait_time.checked_sub(self.wait_time) {
*instant += self.wait_time;
self.ttl = new_ttl;
self.max_wait_time = new_max_wait_time;
self.wait_time *= 2;
}
}
Expand Down Expand Up @@ -206,10 +210,10 @@ struct Deadline {
}

impl Deadline {
fn new(wait_time: Option<(Duration, usize)>) -> Self {
fn new(wait_time: Option<(Duration, Duration)>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(
wait_time.map(|(wait_time, ttl)| WaitTime::new(wait_time, ttl)),
wait_time.map(|(wait_time, max_wait_time)| WaitTime::new(wait_time, max_wait_time)),
),
}
}
Expand Down Expand Up @@ -610,8 +614,8 @@ impl StageOut {
pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: (Duration, usize),
pub(crate) wait_before_close: (Duration, usize),
pub(crate) wait_before_drop: (Duration, Duration),
pub(crate) wait_before_close: (Duration, Duration),
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
}
Expand Down Expand Up @@ -713,8 +717,8 @@ pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: (Duration, usize),
wait_before_close: (Duration, usize),
wait_before_drop: (Duration, Duration),
wait_before_close: (Duration, Duration),
}

impl TransmissionPipelineProducer {
Expand Down Expand Up @@ -889,8 +893,8 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: (Duration::from_millis(1), 10),
wait_before_close: (Duration::from_secs(5), 10),
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)),
batching_time_limit: Duration::from_micros(1),
};

Expand All @@ -903,8 +907,8 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: (Duration::from_millis(1), 10),
wait_before_close: (Duration::from_secs(5), 10),
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)),
batching_time_limit: Duration::from_micros(1),
};

Expand Down
20 changes: 10 additions & 10 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ pub struct TransportManagerConfig {
pub resolution: Resolution,
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: (Duration, usize),
pub wait_before_close: (Duration, usize),
pub wait_before_drop: (Duration, Duration),
pub wait_before_close: (Duration, Duration),
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub defrag_buff_size: usize,
Expand Down Expand Up @@ -141,8 +141,8 @@ pub struct TransportManagerBuilder {
batch_size: BatchSize,
batching_enabled: bool,
batching_time_limit: Duration,
wait_before_drop: (Duration, usize),
wait_before_close: (Duration, usize),
wait_before_drop: (Duration, Duration),
wait_before_close: (Duration, Duration),
queue_size: QueueSizeConf,
defrag_buff_size: usize,
link_rx_buffer_size: usize,
Expand Down Expand Up @@ -192,12 +192,12 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_drop(mut self, wait_before_drop: (Duration, usize)) -> Self {
pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self {
self.wait_before_drop = wait_before_drop;
self
}

pub fn wait_before_close(mut self, wait_before_close: (Duration, usize)) -> Self {
pub fn wait_before_close(mut self, wait_before_close: (Duration, Duration)) -> Self {
self.wait_before_close = wait_before_close;
self
}
Expand Down Expand Up @@ -263,11 +263,11 @@ impl TransportManagerBuilder {
self = self.link_rx_buffer_size(*link.rx().buffer_size());
self = self.wait_before_drop((
duration_from_i64us(*cc_drop.wait_before_drop()),
*cc_drop.ttl(),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
));
self = self.wait_before_close((
duration_from_i64us(*cc_block.wait_before_close()),
*cc_block.ttl(),
duration_from_i64us(*cc_block.max_wait_before_close_fragments()),
));
self = self.queue_size(link.tx().queue().size().clone());
self = self.tx_threads(*link.tx().threads());
Expand Down Expand Up @@ -377,11 +377,11 @@ impl Default for TransportManagerBuilder {
batching_enabled: true,
wait_before_drop: (
duration_from_i64us(*cc_drop.wait_before_drop()),
*cc_drop.ttl(),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
),
wait_before_close: (
duration_from_i64us(*cc_block.wait_before_close()),
*cc_block.ttl(),
duration_from_i64us(*cc_block.max_wait_before_close_fragments()),
),
queue_size: queue.size,
batching_time_limit: Duration::from_millis(backoff),
Expand Down