Skip to content

Commit

Permalink
feat(s2n-quic-transport): Adds stream batching functionality (#2433)
Browse files Browse the repository at this point in the history
  • Loading branch information
maddeleine authored Jan 7, 2025
1 parent fa2e663 commit a45011c
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 46 deletions.
11 changes: 11 additions & 0 deletions quic/s2n-quic-core/src/connection/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30);
//# received.
pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3;

pub const DEFAULT_STREAM_BATCH_SIZE: u8 = 1;

#[non_exhaustive]
#[derive(Debug)]
pub struct ConnectionInfo<'a> {
Expand Down Expand Up @@ -74,6 +76,7 @@ pub struct Limits {
pub(crate) initial_round_trip_time: Duration,
pub(crate) migration_support: MigrationSupport,
pub(crate) anti_amplification_multiplier: u8,
pub(crate) stream_batch_size: u8,
}

impl Default for Limits {
Expand Down Expand Up @@ -120,6 +123,7 @@ impl Limits {
initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT,
migration_support: MigrationSupport::RECOMMENDED,
anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER,
stream_batch_size: DEFAULT_STREAM_BATCH_SIZE,
}
}

Expand Down Expand Up @@ -222,6 +226,7 @@ impl Limits {
max_active_connection_ids,
u64
);
setter!(with_stream_batch_size, stream_batch_size, u8);
setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8);
setter!(with_max_ack_ranges, ack_ranges_limit, u8);
setter!(
Expand Down Expand Up @@ -384,6 +389,12 @@ impl Limits {
pub fn anti_amplification_multiplier(&self) -> u8 {
self.anti_amplification_multiplier
}

#[doc(hidden)]
#[inline]
pub fn stream_batch_size(&self) -> u8 {
self.stream_batch_size
}
}

/// Creates limits for a given connection
Expand Down
6 changes: 3 additions & 3 deletions quic/s2n-quic-transport/src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
connection_limits.stream_limits(),
min_rtt,
),
streams: StreamContainer::new(),
streams: StreamContainer::new(connection_limits),
next_stream_ids: StreamIdSet::initial(),
local_endpoint_type,
initial_local_limits,
Expand Down Expand Up @@ -849,7 +849,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
transmission::context::RetransmissionContext::new(context);

// Prioritize retransmitting lost data
self.inner.streams.iterate_retransmission_list(
self.inner.streams.send_on_retransmission_list(
&mut self.inner.stream_controller,
|stream: &mut S| {
transmit_result = stream.on_transmit(&mut retransmission_context);
Expand All @@ -866,7 +866,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
}

if context.transmission_constraint().can_transmit() {
self.inner.streams.iterate_transmission_list(
self.inner.streams.send_on_transmission_list(
&mut self.inner.stream_controller,
|stream: &mut S| {
transmit_result = stream.on_transmit(context);
Expand Down
86 changes: 86 additions & 0 deletions quic/s2n-quic-transport/src/stream/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2530,6 +2530,11 @@ fn on_transmit_queries_streams_for_data() {
endpoint::Type::Server,
);

assert_eq!(
[stream_1, stream_2, stream_3, stream_4],
*manager.streams_waiting_for_transmission()
);

assert_eq!(
Err(OnTransmitError::CouldNotWriteFrame),
manager.on_transmit(&mut write_context)
Expand Down Expand Up @@ -3249,3 +3254,84 @@ fn stream_transmission_fairness_test() {
}
}
}

#[test]
fn stream_batching_test() {
for batch_size in 1..=10 {
dbg!(batch_size);
let limits = ConnectionLimits::default()
.with_stream_batch_size(batch_size)
.unwrap();

let mut manager = AbstractStreamManager::<stream::StreamImpl>::new(
&limits,
endpoint::Type::Server,
create_default_initial_flow_control_limits(),
create_default_initial_flow_control_limits(),
DEFAULT_INITIAL_RTT,
);

// Create some open Streams
let mut stream_ids: VecDeque<StreamId> = (0..4)
.map(|_| {
let (accept_waker, _accept_wake_counter) = new_count_waker();
let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
let mut token = connection::OpenToken::new();

let result = match manager.poll_open_local_stream(
StreamType::Bidirectional,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&accept_waker),
) {
Poll::Ready(res) => res,
Poll::Pending => Err(connection::Error::unspecified()),
};
result.unwrap()
})
.collect();

// Create a context that can only fit packets of size 50
let mut frame_buffer = OutgoingFrameBuffer::new();
let max_packet_size = 50;
frame_buffer.set_max_packet_size(Some(max_packet_size));
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);

const DATA_SIZE: usize = 2000;
let array: [u8; DATA_SIZE] = [1; DATA_SIZE];

// Set up each stream to have much more data to send than can fit in our test packet
for stream_id in &stream_ids {
manager
.with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| {
let data_to_send = bytes::Bytes::copy_from_slice(&array);
stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None)
})
.unwrap();
}
// make sure the order matches creation order
assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

// Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times.
// Then the stream gets sent to the back of the transmission list.
for idx in 1..=40 {
dbg!(idx);
let _ = manager.on_transmit(&mut write_context);

assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

write_context.frame_buffer.flush();

if idx % batch_size == 0 {
// The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets
stream_ids.rotate_left(1);
}
}
}
}
Loading

0 comments on commit a45011c

Please sign in to comment.