Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable wait_before_drop time in case of CongestionControl::Drop #848

Merged
merged 3 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@
data_low: 4,
background: 4,
},
/// Congestion occurs when the queue is empty (no available batch).
/// Using CongestionControl::Block the caller is blocked until a batch is available and re-insterted into the queue.
/// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here.
congestion_control: {
/// The maximum time in nanosecond to wait for an available batch before dropping the message if still no batch is available.
JEnoch marked this conversation as resolved.
Show resolved Hide resolved
wait_before_drop: 1000000
},
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
backoff: 100,
Expand Down
9 changes: 9 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl Default for QueueConf {
fn default() -> Self {
Self {
size: QueueSizeConf::default(),
congestion_control: CongestionControlConf::default(),
backoff: 100,
}
}
Expand All @@ -195,6 +196,14 @@ impl Default for QueueSizeConf {
}
}

impl Default for CongestionControlConf {
fn default() -> Self {
Self {
wait_before_drop: 1000000,
}
}
}

impl Default for LinkRxConf {
fn default() -> Self {
Self {
Expand Down
7 changes: 7 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,13 @@ validated_struct::validator! {
data_low: usize,
background: usize,
} where (queue_size_validator),
/// Congestion occurs when the queue is empty (no available batch).
/// Using CongestionControl::Block the caller is blocked until a batch is available and re-insterted into the queue.
/// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here.
pub congestion_control: CongestionControlConf {
/// The maximum time in nanosecond to wait for an available batch before dropping the message if still no batch is available.
pub wait_before_drop: u64,
},
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
backoff: u64,
Expand Down
109 changes: 65 additions & 44 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use super::{
};
use flume::{bounded, Receiver, Sender};
use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::time::Duration;
use std::{
sync::atomic::{AtomicBool, AtomicU16, Ordering},
time::Instant,
};
use zenoh_buffers::{
reader::{HasReader, Reader},
writer::HasWriter,
Expand Down Expand Up @@ -63,6 +65,10 @@ impl StageInRefill {
fn wait(&self) -> bool {
self.n_ref_r.recv().is_ok()
}

fn wait_deadline(&self, instant: Instant) -> bool {
self.n_ref_r.recv_deadline(instant).is_ok()
}
}

// Inner structure to link the initial stage with the final stage of the pipeline
Expand Down Expand Up @@ -121,13 +127,15 @@ struct StageIn {
}

impl StageIn {
fn push_network_message(&mut self, msg: &mut NetworkMessage, priority: Priority) -> bool {
fn push_network_message(
&mut self,
msg: &mut NetworkMessage,
priority: Priority,
deadline_before_drop: Option<Instant>,
) -> bool {
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();

// Check congestion control
let is_droppable = msg.is_droppable();

macro_rules! zgetbatch_rets {
($fragment:expr, $restore_sn:expr) => {
loop {
Expand All @@ -140,19 +148,24 @@ impl StageIn {
}
None => {
drop(c_guard);
if !$fragment && is_droppable {
// Restore the sequence number
$restore_sn;
// We are in the congestion scenario
// The yield is to avoid the writing task to spin
// indefinitely and monopolize the CPU usage.
thread::yield_now();
return false;
} else {
if !self.s_ref.wait() {
match deadline_before_drop {
Some(deadline) if !$fragment => {
// Restore the sequence number
$restore_sn;
JEnoch marked this conversation as resolved.
Show resolved Hide resolved
return false;
// We are in the congestion scenario and message is droppable
// Wait for an available batch until deadline
if !self.s_ref.wait_deadline(deadline) {
// Still no available batch - Drop message
return false
}
}
_ => {
// Block waiting for an available batch
if !self.s_ref.wait() {
// Restore the sequence number
$restore_sn;
return false;
}
}
}
c_guard = self.mutex.current();
Expand Down Expand Up @@ -487,24 +500,10 @@ impl StageOut {
pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: Duration,
pub(crate) backoff: Duration,
}

impl Default for TransmissionPipelineConf {
fn default() -> Self {
Self {
batch: BatchConfig {
mtu: BatchSize::MAX,
is_streamed: false,
#[cfg(feature = "transport_compression")]
is_compression: false,
},
queue_size: [1; Priority::NUM],
backoff: Duration::from_micros(1),
}
}
}

// A 2-stage transmission pipeline
pub(crate) struct TransmissionPipeline;
impl TransmissionPipeline {
Expand Down Expand Up @@ -579,6 +578,7 @@ impl TransmissionPipeline {
let producer = TransmissionPipelineProducer {
stage_in: stage_in.into_boxed_slice().into(),
active: active.clone(),
wait_before_drop: config.wait_before_drop,
};
let consumer = TransmissionPipelineConsumer {
stage_out: stage_out.into_boxed_slice(),
Expand All @@ -595,6 +595,7 @@ pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: Duration,
}

impl TransmissionPipelineProducer {
Expand All @@ -607,9 +608,15 @@ impl TransmissionPipelineProducer {
} else {
(0, Priority::default())
};
// If message is droppable, compute a deadline after which the sample could be dropped
let deadline_before_drop = if msg.is_droppable() {
Some(Instant::now() + self.wait_before_drop)
} else {
None
};
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority)
queue.push_network_message(&mut msg, priority, deadline_before_drop)
}

#[inline]
Expand Down Expand Up @@ -732,14 +739,27 @@ mod tests {
const SLEEP: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_secs(60);

const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf {
const CONFIG_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf {
batch: BatchConfig {
mtu: BatchSize::MAX,
is_streamed: true,
#[cfg(feature = "transport_compression")]
is_compression: true,
},
queue_size: [1; Priority::NUM],
wait_before_drop: Duration::from_millis(1),
backoff: Duration::from_micros(1),
};

const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf {
batch: BatchConfig {
mtu: BatchSize::MAX,
is_streamed: false,
#[cfg(feature = "transport_compression")]
is_compression: false,
},
queue_size: [1; Priority::NUM],
wait_before_drop: Duration::from_millis(1),
backoff: Duration::from_micros(1),
};

Expand Down Expand Up @@ -847,10 +867,8 @@ mod tests {
// Compute the number of messages to send
let num_msg = max_msgs.min(bytes / ps);

let (producer, consumer) = TransmissionPipeline::make(
TransmissionPipelineConf::default(),
priorities.as_slice(),
);
let (producer, consumer) =
TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice());

let t_c = task::spawn(async move {
consume(consumer, num_msg).await;
Expand All @@ -874,7 +892,7 @@ mod tests {
// Make sure to put only one message per batch: set the payload size
// to half of the batch in such a way the serialized zenoh message
// will be larger then half of the batch size (header + payload).
let payload_size = (CONFIG.batch.mtu / 2) as usize;
let payload_size = (CONFIG_STREAMED.batch.mtu / 2) as usize;

// Send reliable messages
let key = "test".into();
Expand All @@ -900,7 +918,7 @@ mod tests {

// The last push should block since there shouldn't any more batches
// available for serialization.
let num_msg = 1 + CONFIG.queue_size[0];
let num_msg = 1 + CONFIG_STREAMED.queue_size[0];
for i in 0..num_msg {
println!(
"Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes"
Expand All @@ -919,7 +937,7 @@ mod tests {
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?;
let priorities = vec![tct];
let (producer, mut consumer) =
TransmissionPipeline::make(TransmissionPipelineConf::default(), priorities.as_slice());
TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice());

let counter = Arc::new(AtomicUsize::new(0));

Expand All @@ -937,10 +955,12 @@ mod tests {
// Wait to have sent enough messages and to have blocked
println!(
"Pipeline Blocking [---]: waiting to have {} messages being scheduled",
CONFIG.queue_size[Priority::MAX as usize]
CONFIG_STREAMED.queue_size[Priority::MAX as usize]
);
let check = async {
while counter.load(Ordering::Acquire) < CONFIG.queue_size[Priority::MAX as usize] {
while counter.load(Ordering::Acquire)
< CONFIG_STREAMED.queue_size[Priority::MAX as usize]
{
tokio::time::sleep(SLEEP).await;
}
};
Expand Down Expand Up @@ -972,7 +992,8 @@ mod tests {
// Queue
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX)).unwrap();
let priorities = vec![tct];
let (producer, mut consumer) = TransmissionPipeline::make(CONFIG, priorities.as_slice());
let (producer, mut consumer) =
TransmissionPipeline::make(CONFIG_STREAMED, priorities.as_slice());
let count = Arc::new(AtomicUsize::new(0));
let size = Arc::new(AtomicUsize::new(0));

Expand Down
14 changes: 14 additions & 0 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub struct TransportManagerConfig {
pub whatami: WhatAmI,
pub resolution: Resolution,
pub batch_size: u16,
pub wait_before_drop: Duration,
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub defrag_buff_size: usize,
Expand Down Expand Up @@ -122,6 +123,7 @@ pub struct TransportManagerBuilder {
whatami: WhatAmI,
resolution: Resolution,
batch_size: u16,
wait_before_drop: Duration,
queue_size: QueueSizeConf,
queue_backoff: Duration,
defrag_buff_size: usize,
Expand Down Expand Up @@ -154,6 +156,11 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self {
self.wait_before_drop = wait_before_drop;
self
}

pub fn queue_size(mut self, queue_size: QueueSizeConf) -> Self {
self.queue_size = queue_size;
self
Expand Down Expand Up @@ -212,7 +219,11 @@ impl TransportManagerBuilder {
self = self.batch_size(*link.tx().batch_size());
self = self.defrag_buff_size(*link.rx().max_message_size());
self = self.link_rx_buffer_size(*link.rx().buffer_size());
self = self.wait_before_drop(Duration::from_nanos(
*link.tx().queue().congestion_control().wait_before_drop(),
));
self = self.queue_size(link.tx().queue().size().clone());
self = self.queue_backoff(Duration::from_nanos(*link.tx().queue().backoff()));
self = self.tx_threads(*link.tx().threads());
self = self.protocols(link.protocols().clone());

Expand Down Expand Up @@ -259,6 +270,7 @@ impl TransportManagerBuilder {
whatami: self.whatami,
resolution: self.resolution,
batch_size: self.batch_size,
wait_before_drop: self.wait_before_drop,
queue_size,
queue_backoff: self.queue_backoff,
defrag_buff_size: self.defrag_buff_size,
Expand Down Expand Up @@ -292,12 +304,14 @@ impl Default for TransportManagerBuilder {
let link_rx = LinkRxConf::default();
let queue = QueueConf::default();
let backoff = *queue.backoff();
let wait_before_drop = *queue.congestion_control().wait_before_drop();
Self {
version: VERSION,
zid: ZenohId::rand(),
whatami: zenoh_config::defaults::mode,
resolution: Resolution::default(),
batch_size: BatchSize::MAX,
wait_before_drop: Duration::from_nanos(wait_before_drop),
queue_size: queue.size,
queue_backoff: Duration::from_nanos(backoff),
defrag_buff_size: *link_rx.max_message_size(),
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl TransportLinkMulticastUniversal {
let tpc = TransmissionPipelineConf {
batch: self.link.config.batch,
queue_size: self.transport.manager.config.queue_size,
wait_before_drop: self.transport.manager.config.wait_before_drop,
backoff: self.transport.manager.config.queue_backoff,
};
// The pipeline
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl TransportLinkUnicastUniversal {
is_compression: link.config.batch.is_compression,
},
queue_size: transport.manager.config.queue_size,
wait_before_drop: transport.manager.config.wait_before_drop,
backoff: transport.manager.config.queue_backoff,
};

Expand Down