Skip to content

Commit

Permalink
Buffer disordered zero rtt packets on the server endpoint (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
iyangsj authored May 16, 2024
1 parent 16ccb9f commit 40c816d
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ strum = "0.24"
strum_macros = "0.24"
rand = "0.8.5"
smallvec = { version = "1.10", features = ["serde", "union"] }
hashlru = "0.11"
serde = { version = "1.0.139", features = ["derive"] }
serde_json = { version = "1.0", features = ["preserve_order"] }
serde_derive = "1.0"
Expand Down
6 changes: 6 additions & 0 deletions include/tquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,12 @@ void quic_config_set_cid_len(struct quic_config_t *config, uint8_t v);
*/
void quic_config_set_send_batch_size(struct quic_config_t *config, uint16_t v);

/**
* Set the buffer size for disordered zerortt packets on the server.
* Applicable to Server only.
*/
void quic_config_set_zerortt_buffer_size(struct quic_config_t *config, uint16_t v);

/**
* Create a new TlsConfig.
* The caller is responsible for the memory of the TlsConfig and should properly
Expand Down
191 changes: 181 additions & 10 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub struct Endpoint {
/// Used to send packet out.
sender: Rc<dyn PacketSendHandler>,

/// Buffer for ZeroRTT packets that arrive before Initial packets due to
/// potential misordering or loss of Initial packets.
buffer: PacketBuffer,

/// Packets generated by the endpoint.
packets: PacketQueue,

Expand All @@ -111,6 +115,7 @@ impl Endpoint {
cid_lifetime: None,
});
let trace_id = if is_server { "SERVER" } else { "CLIENT" };
let buffer = PacketBuffer::new(config.zerortt_buffer_size);
let packets = PacketQueue::new(config.send_batch_size);

Self {
Expand All @@ -123,6 +128,7 @@ impl Endpoint {
cid_gen,
handler,
sender,
buffer,
packets,
closed: false,
trace_id: trace_id.to_string(),
Expand Down Expand Up @@ -285,17 +291,35 @@ impl Endpoint {
conn.set_queues(self.queues.clone());
trace!(
"{} create a server connection {:?}",
&self.trace_id,
conn.trace_id(),
&self.trace_id
);

self.handler.on_conn_created(conn);
conn.mark_tickable(true);
conn.recv(buf, info).map(|_| ())?;

// Check and delivery buffered ZeroRTT Packets to the conn.
if let Some(mut v) = self.buffer.del(&hdr.dcid) {
trace!(
"{} try to delivery {} buffered zerortt packets to connection {:?}",
&self.trace_id,
v.len(),
conn.trace_id(),
);
for (ref mut buf, info) in v.iter_mut() {
conn.recv(buf, info).map(|_| ())?;
}
}
}
return Ok(());
}

// Try to buffer ZeroRTT packets for the unknown connection on the server
if hdr.pkt_type == PacketType::ZeroRTT && !self.closed {
self.buffer.add(hdr.dcid, buf.to_vec(), *info);
}

// Send the Stateless Reset packet for the unknown connection
if hdr.pkt_type == PacketType::OneRTT && !hdr.dcid.is_empty() && self.config.stateless_reset
{
Expand Down Expand Up @@ -964,6 +988,41 @@ impl ConnectionRoutes {
}
}

const MAX_ZERORTT_PACKETS_PER_CONN: usize = 10;

/// PacketBuffer is used for buffering early incoming ZeroRTT packets on the server.
/// Buffered packets are indexed by odcid.
struct PacketBuffer {
packets: hashlru::Cache<ConnectionId, Vec<(Vec<u8>, PacketInfo)>>,
}

impl PacketBuffer {
fn new(cache_size: usize) -> Self {
Self {
packets: hashlru::Cache::new(cache_size / MAX_ZERORTT_PACKETS_PER_CONN),
}
}

/// Buffer a ZeroRTT packet for the given connection
fn add(&mut self, dcid: ConnectionId, buffer: Vec<u8>, info: PacketInfo) {
if let Some(v) = self.packets.get_mut(&dcid) {
if v.len() < MAX_ZERORTT_PACKETS_PER_CONN {
v.push((buffer, info));
}
return;
}

let mut v = Vec::with_capacity(MAX_ZERORTT_PACKETS_PER_CONN);
v.push((buffer, info));
self.packets.insert(dcid, v);
}

/// Remove all packets for the specified connection
fn del(&mut self, dcid: &ConnectionId) -> Option<Vec<(Vec<u8>, PacketInfo)>> {
self.packets.remove(dcid)
}
}

const MAX_BUFFER_SIZE: usize = 2048;

/// PacketQueue is used for sending out packets in batches.
Expand Down Expand Up @@ -1077,6 +1136,22 @@ mod tests {

/// Run client/server endpoint in two threads.
fn run(&mut self, cli_conf: Config, srv_conf: Config, case_conf: CaseConf) -> Result<()> {
self.run_with_packet_filter(
cli_conf,
srv_conf,
case_conf,
Box::new(NoopPacketFilter {}),
)
}

/// Run client/server endpoint in two threads.
fn run_with_packet_filter(
&mut self,
cli_conf: Config,
srv_conf: Config,
case_conf: CaseConf,
cli_filter: Box<dyn PacketFilter + Send>,
) -> Result<()> {
// Exit if client/server thread panic
std::panic::set_hook(Box::new(|panic_info| {
println!(
Expand All @@ -1089,8 +1164,9 @@ mod tests {

// Prepare client/server sockets and config
let mut cli_poll = mio::Poll::new().unwrap();
let cli_sock =
let mut cli_sock =
TestSocket::new(cli_poll.registry(), &case_conf, "CLIENT".into()).unwrap();
cli_sock.set_filter(cli_filter);
let cli_addr = cli_sock.local_addr()?;
let cli_case_conf = case_conf.clone();
let cli_stop = Arc::clone(&self.stop);
Expand Down Expand Up @@ -1311,13 +1387,19 @@ mod tests {
}
}

struct TestSocketState {
/// Rng for testing purposes.
rng: StepRng,

/// Packet filter for outgoing packets
filter: Option<Box<dyn PacketFilter + Send>>,
}

/// UdpSocket with fault injection.
struct TestSocket {
socket: mio::net::UdpSocket,

/// Rng for testing purposes.
/// TODO: use custom deterministic rng
rng: RefCell<StepRng>,
state: RefCell<TestSocketState>,

/// Used for simulating packet loss (0~100)
packet_loss: u32,
Expand Down Expand Up @@ -1348,11 +1430,14 @@ mod tests {
reg.register(&mut socket, TOKEN, mio::Interest::READABLE)
.unwrap();

let rng = RefCell::new(StepRng::new(0, 1));
let state = RefCell::new(TestSocketState {
rng: StepRng::new(0, 1),
filter: None,
});

Ok(Self {
socket,
rng,
state,
packet_loss: conf.packet_loss,
packet_delay: conf.packet_delay,
packet_reorder: conf.packet_reorder,
Expand All @@ -1362,14 +1447,19 @@ mod tests {
})
}

/// Set the customized packter filter
fn set_filter(&mut self, filter: Box<dyn PacketFilter + Send>) {
self.state.borrow_mut().filter = Some(filter);
}

/// Return the local socket address.
fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.socket.local_addr()
}

/// Whether an abnormal event should be injected.
fn sampling(&self, rate: u32) -> bool {
self.rng.borrow_mut().next_u32() % 100 < rate
self.state.borrow_mut().rng.next_u32() % 100 < rate
}

/// Filter packets which are delayed long enough.
Expand Down Expand Up @@ -1418,7 +1508,7 @@ mod tests {
while start < pkts.len() {
let end = cmp::min(start + window, pkts.len());
let range = &mut pkts[start..end];
range.shuffle(&mut *self.rng.borrow_mut());
range.shuffle(&mut self.state.borrow_mut().rng);
start = end;
}
trace!(
Expand Down Expand Up @@ -1481,8 +1571,12 @@ mod tests {
fn on_packets_send(&self, pkts: &[(Vec<u8>, PacketInfo)]) -> crate::Result<usize> {
let mut count = 0;

// Simulate event of packet delay
let mut pkts = pkts.to_vec();
if let Some(ref mut f) = &mut self.state.borrow_mut().filter {
f.filter(&mut pkts);
}

// Simulate event of packet delay
pkts = self.try_delay_packets(pkts);
if pkts.is_empty() {
return Ok(0);
Expand Down Expand Up @@ -1521,6 +1615,43 @@ mod tests {
}
}

trait PacketFilter {
fn filter(&mut self, pkts: &mut Vec<(Vec<u8>, PacketInfo)>);
}

struct NoopPacketFilter {}

impl PacketFilter for NoopPacketFilter {
fn filter(&mut self, _pkts: &mut Vec<(Vec<u8>, PacketInfo)>) {}
}

struct FirstPacketFilter {
count: u64,
drop_or_disorder: bool,
}

impl FirstPacketFilter {
fn new(drop_or_disorder: bool) -> Self {
Self {
count: 0,
drop_or_disorder,
}
}
}

impl PacketFilter for FirstPacketFilter {
fn filter(&mut self, pkts: &mut Vec<(Vec<u8>, PacketInfo)>) {
if self.count == 0 {
if self.drop_or_disorder && pkts.len() >= 1 {
pkts.remove(0);
} else if pkts.len() >= 2 {
pkts.swap(0, 1);
}
}
self.count += pkts.len() as u64;
}
}

// A mocked socket which implements PacketSendHandler.
struct MockSocket {
packets: RefCell<Vec<(Vec<u8>, PacketInfo)>>,
Expand Down Expand Up @@ -2392,6 +2523,46 @@ mod tests {
Ok(())
}

#[test]
fn transfer_single_stream_0rtt_with_initial_lost() -> Result<()> {
let mut t = TestPair::new();

let cli_conf = TestPair::new_test_config(false)?;
let srv_conf = TestPair::new_test_config(true)?;

let mut case_conf = CaseConf::default();
case_conf.session = Some(TestPair::new_test_session_state());
case_conf.client_0rtt_expected = true;
case_conf.resumption_expected = true;
case_conf.request_num = 1;
case_conf.request_size = 1024 * 16;

// Drop the first packet(Initial) sent by the client
let filter = Box::new(FirstPacketFilter::new(true));
t.run_with_packet_filter(cli_conf, srv_conf, case_conf, filter)?;
Ok(())
}

#[test]
fn transfer_single_stream_0rtt_with_initial_disordered() -> Result<()> {
let mut t = TestPair::new();

let cli_conf = TestPair::new_test_config(false)?;
let srv_conf = TestPair::new_test_config(true)?;

let mut case_conf = CaseConf::default();
case_conf.session = Some(TestPair::new_test_session_state());
case_conf.client_0rtt_expected = true;
case_conf.resumption_expected = true;
case_conf.request_num = 1;
case_conf.request_size = 1024 * 16;

// Disorder the first packet(Initial) sent by the client
let filter = Box::new(FirstPacketFilter::new(false));
t.run_with_packet_filter(cli_conf, srv_conf, case_conf, filter)?;
Ok(())
}

#[test]
fn transfer_single_stream_0rtt_reject() -> Result<()> {
let mut t = TestPair::new();
Expand Down
7 changes: 7 additions & 0 deletions src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ pub extern "C" fn quic_config_set_send_batch_size(config: &mut Config, v: u16) {
config.set_send_batch_size(v as usize);
}

/// Set the buffer size for disordered zerortt packets on the server.
/// Applicable to Server only.
#[no_mangle]
pub extern "C" fn quic_config_set_zerortt_buffer_size(config: &mut Config, v: u16) {
config.set_zerortt_buffer_size(v as usize);
}

/// Create a new TlsConfig.
/// The caller is responsible for the memory of the TlsConfig and should properly
/// destroy it by calling `quic_tls_config_free`.
Expand Down
10 changes: 10 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ pub struct Config {
/// Maximum numbers of packets sent in a batch.
send_batch_size: usize,

/// Buffer size for early incoming zero rtt packets, in packets.
zerortt_buffer_size: usize,

/// Configurations about loss recovery, congestion control, and pmtu discovery.
recovery: RecoveryConfig,

Expand Down Expand Up @@ -403,6 +406,7 @@ impl Config {
cid_len: 8,
anti_amplification_factor: ANTI_AMPLIFICATION_FACTOR,
send_batch_size: 64,
zerortt_buffer_size: 1000,
recovery: RecoveryConfig::default(),
multipath: MultipathConfig::default(),
tls_config_selector: None,
Expand Down Expand Up @@ -636,6 +640,12 @@ impl Config {
self.send_batch_size = cmp::max(v, 1);
}

/// Set the buffer size for disordered zerortt packets on the server.
/// Applicable to Server only.
pub fn set_zerortt_buffer_size(&mut self, v: usize) {
self.zerortt_buffer_size = v;
}

/// Set TLS config.
pub fn set_tls_config(&mut self, tls_config: tls::TlsConfig) {
self.set_tls_config_selector(Arc::new(tls::DefaultTlsConfigSelector {
Expand Down
Loading

0 comments on commit 40c816d

Please sign in to comment.