From 40c816d9ebb8cb062388f5450829510defe48dfe Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Thu, 16 May 2024 20:48:14 +0800 Subject: [PATCH] Buffer disordered zero rtt packets on the server endpoint (#270) --- Cargo.toml | 1 + include/tquic.h | 6 ++ src/endpoint.rs | 191 ++++++++++++++++++++++++++++++++-- src/ffi.rs | 7 ++ src/lib.rs | 10 ++ tools/src/bin/tquic_server.rs | 10 ++ 6 files changed, 215 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b918650e..bb45b4b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ strum = "0.24" strum_macros = "0.24" rand = "0.8.5" smallvec = { version = "1.10", features = ["serde", "union"] } +hashlru = "0.11" serde = { version = "1.0.139", features = ["derive"] } serde_json = { version = "1.0", features = ["preserve_order"] } serde_derive = "1.0" diff --git a/include/tquic.h b/include/tquic.h index efd56a97..0bb2932e 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -497,6 +497,12 @@ void quic_config_set_cid_len(struct quic_config_t *config, uint8_t v); */ void quic_config_set_send_batch_size(struct quic_config_t *config, uint16_t v); +/** + * Set the buffer size for disordered zerortt packets on the server. + * Applicable to Server only. + */ +void quic_config_set_zerortt_buffer_size(struct quic_config_t *config, uint16_t v); + /** * Create a new TlsConfig. * The caller is responsible for the memory of the TlsConfig and should properly diff --git a/src/endpoint.rs b/src/endpoint.rs index 77278e18..94f7361a 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -88,6 +88,10 @@ pub struct Endpoint { /// Used to send packet out. sender: Rc, + /// Buffer for ZeroRTT packets that arrive before Initial packets due to + /// potential misordering or loss of Initial packets. + buffer: PacketBuffer, + /// Packets generated by the endpoint. packets: PacketQueue, @@ -111,6 +115,7 @@ impl Endpoint { cid_lifetime: None, }); let trace_id = if is_server { "SERVER" } else { "CLIENT" }; + let buffer = PacketBuffer::new(config.zerortt_buffer_size); let packets = PacketQueue::new(config.send_batch_size); Self { @@ -123,6 +128,7 @@ impl Endpoint { cid_gen, handler, sender, + buffer, packets, closed: false, trace_id: trace_id.to_string(), @@ -285,17 +291,35 @@ impl Endpoint { conn.set_queues(self.queues.clone()); trace!( "{} create a server connection {:?}", + &self.trace_id, conn.trace_id(), - &self.trace_id ); self.handler.on_conn_created(conn); conn.mark_tickable(true); conn.recv(buf, info).map(|_| ())?; + + // Check and delivery buffered ZeroRTT Packets to the conn. + if let Some(mut v) = self.buffer.del(&hdr.dcid) { + trace!( + "{} try to delivery {} buffered zerortt packets to connection {:?}", + &self.trace_id, + v.len(), + conn.trace_id(), + ); + for (ref mut buf, info) in v.iter_mut() { + conn.recv(buf, info).map(|_| ())?; + } + } } return Ok(()); } + // Try to buffer ZeroRTT packets for the unknown connection on the server + if hdr.pkt_type == PacketType::ZeroRTT && !self.closed { + self.buffer.add(hdr.dcid, buf.to_vec(), *info); + } + // Send the Stateless Reset packet for the unknown connection if hdr.pkt_type == PacketType::OneRTT && !hdr.dcid.is_empty() && self.config.stateless_reset { @@ -964,6 +988,41 @@ impl ConnectionRoutes { } } +const MAX_ZERORTT_PACKETS_PER_CONN: usize = 10; + +/// PacketBuffer is used for buffering early incoming ZeroRTT packets on the server. +/// Buffered packets are indexed by odcid. +struct PacketBuffer { + packets: hashlru::Cache, PacketInfo)>>, +} + +impl PacketBuffer { + fn new(cache_size: usize) -> Self { + Self { + packets: hashlru::Cache::new(cache_size / MAX_ZERORTT_PACKETS_PER_CONN), + } + } + + /// Buffer a ZeroRTT packet for the given connection + fn add(&mut self, dcid: ConnectionId, buffer: Vec, info: PacketInfo) { + if let Some(v) = self.packets.get_mut(&dcid) { + if v.len() < MAX_ZERORTT_PACKETS_PER_CONN { + v.push((buffer, info)); + } + return; + } + + let mut v = Vec::with_capacity(MAX_ZERORTT_PACKETS_PER_CONN); + v.push((buffer, info)); + self.packets.insert(dcid, v); + } + + /// Remove all packets for the specified connection + fn del(&mut self, dcid: &ConnectionId) -> Option, PacketInfo)>> { + self.packets.remove(dcid) + } +} + const MAX_BUFFER_SIZE: usize = 2048; /// PacketQueue is used for sending out packets in batches. @@ -1077,6 +1136,22 @@ mod tests { /// Run client/server endpoint in two threads. fn run(&mut self, cli_conf: Config, srv_conf: Config, case_conf: CaseConf) -> Result<()> { + self.run_with_packet_filter( + cli_conf, + srv_conf, + case_conf, + Box::new(NoopPacketFilter {}), + ) + } + + /// Run client/server endpoint in two threads. + fn run_with_packet_filter( + &mut self, + cli_conf: Config, + srv_conf: Config, + case_conf: CaseConf, + cli_filter: Box, + ) -> Result<()> { // Exit if client/server thread panic std::panic::set_hook(Box::new(|panic_info| { println!( @@ -1089,8 +1164,9 @@ mod tests { // Prepare client/server sockets and config let mut cli_poll = mio::Poll::new().unwrap(); - let cli_sock = + let mut cli_sock = TestSocket::new(cli_poll.registry(), &case_conf, "CLIENT".into()).unwrap(); + cli_sock.set_filter(cli_filter); let cli_addr = cli_sock.local_addr()?; let cli_case_conf = case_conf.clone(); let cli_stop = Arc::clone(&self.stop); @@ -1311,13 +1387,19 @@ mod tests { } } + struct TestSocketState { + /// Rng for testing purposes. + rng: StepRng, + + /// Packet filter for outgoing packets + filter: Option>, + } + /// UdpSocket with fault injection. struct TestSocket { socket: mio::net::UdpSocket, - /// Rng for testing purposes. - /// TODO: use custom deterministic rng - rng: RefCell, + state: RefCell, /// Used for simulating packet loss (0~100) packet_loss: u32, @@ -1348,11 +1430,14 @@ mod tests { reg.register(&mut socket, TOKEN, mio::Interest::READABLE) .unwrap(); - let rng = RefCell::new(StepRng::new(0, 1)); + let state = RefCell::new(TestSocketState { + rng: StepRng::new(0, 1), + filter: None, + }); Ok(Self { socket, - rng, + state, packet_loss: conf.packet_loss, packet_delay: conf.packet_delay, packet_reorder: conf.packet_reorder, @@ -1362,6 +1447,11 @@ mod tests { }) } + /// Set the customized packter filter + fn set_filter(&mut self, filter: Box) { + self.state.borrow_mut().filter = Some(filter); + } + /// Return the local socket address. fn local_addr(&self) -> std::io::Result { self.socket.local_addr() @@ -1369,7 +1459,7 @@ mod tests { /// Whether an abnormal event should be injected. fn sampling(&self, rate: u32) -> bool { - self.rng.borrow_mut().next_u32() % 100 < rate + self.state.borrow_mut().rng.next_u32() % 100 < rate } /// Filter packets which are delayed long enough. @@ -1418,7 +1508,7 @@ mod tests { while start < pkts.len() { let end = cmp::min(start + window, pkts.len()); let range = &mut pkts[start..end]; - range.shuffle(&mut *self.rng.borrow_mut()); + range.shuffle(&mut self.state.borrow_mut().rng); start = end; } trace!( @@ -1481,8 +1571,12 @@ mod tests { fn on_packets_send(&self, pkts: &[(Vec, PacketInfo)]) -> crate::Result { let mut count = 0; - // Simulate event of packet delay let mut pkts = pkts.to_vec(); + if let Some(ref mut f) = &mut self.state.borrow_mut().filter { + f.filter(&mut pkts); + } + + // Simulate event of packet delay pkts = self.try_delay_packets(pkts); if pkts.is_empty() { return Ok(0); @@ -1521,6 +1615,43 @@ mod tests { } } + trait PacketFilter { + fn filter(&mut self, pkts: &mut Vec<(Vec, PacketInfo)>); + } + + struct NoopPacketFilter {} + + impl PacketFilter for NoopPacketFilter { + fn filter(&mut self, _pkts: &mut Vec<(Vec, PacketInfo)>) {} + } + + struct FirstPacketFilter { + count: u64, + drop_or_disorder: bool, + } + + impl FirstPacketFilter { + fn new(drop_or_disorder: bool) -> Self { + Self { + count: 0, + drop_or_disorder, + } + } + } + + impl PacketFilter for FirstPacketFilter { + fn filter(&mut self, pkts: &mut Vec<(Vec, PacketInfo)>) { + if self.count == 0 { + if self.drop_or_disorder && pkts.len() >= 1 { + pkts.remove(0); + } else if pkts.len() >= 2 { + pkts.swap(0, 1); + } + } + self.count += pkts.len() as u64; + } + } + // A mocked socket which implements PacketSendHandler. struct MockSocket { packets: RefCell, PacketInfo)>>, @@ -2392,6 +2523,46 @@ mod tests { Ok(()) } + #[test] + fn transfer_single_stream_0rtt_with_initial_lost() -> Result<()> { + let mut t = TestPair::new(); + + let cli_conf = TestPair::new_test_config(false)?; + let srv_conf = TestPair::new_test_config(true)?; + + let mut case_conf = CaseConf::default(); + case_conf.session = Some(TestPair::new_test_session_state()); + case_conf.client_0rtt_expected = true; + case_conf.resumption_expected = true; + case_conf.request_num = 1; + case_conf.request_size = 1024 * 16; + + // Drop the first packet(Initial) sent by the client + let filter = Box::new(FirstPacketFilter::new(true)); + t.run_with_packet_filter(cli_conf, srv_conf, case_conf, filter)?; + Ok(()) + } + + #[test] + fn transfer_single_stream_0rtt_with_initial_disordered() -> Result<()> { + let mut t = TestPair::new(); + + let cli_conf = TestPair::new_test_config(false)?; + let srv_conf = TestPair::new_test_config(true)?; + + let mut case_conf = CaseConf::default(); + case_conf.session = Some(TestPair::new_test_session_state()); + case_conf.client_0rtt_expected = true; + case_conf.resumption_expected = true; + case_conf.request_num = 1; + case_conf.request_size = 1024 * 16; + + // Disorder the first packet(Initial) sent by the client + let filter = Box::new(FirstPacketFilter::new(false)); + t.run_with_packet_filter(cli_conf, srv_conf, case_conf, filter)?; + Ok(()) + } + #[test] fn transfer_single_stream_0rtt_reject() -> Result<()> { let mut t = TestPair::new(); diff --git a/src/ffi.rs b/src/ffi.rs index e943bfba..88ceeeb6 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -387,6 +387,13 @@ pub extern "C" fn quic_config_set_send_batch_size(config: &mut Config, v: u16) { config.set_send_batch_size(v as usize); } +/// Set the buffer size for disordered zerortt packets on the server. +/// Applicable to Server only. +#[no_mangle] +pub extern "C" fn quic_config_set_zerortt_buffer_size(config: &mut Config, v: u16) { + config.set_zerortt_buffer_size(v as usize); +} + /// Create a new TlsConfig. /// The caller is responsible for the memory of the TlsConfig and should properly /// destroy it by calling `quic_tls_config_free`. diff --git a/src/lib.rs b/src/lib.rs index 2f5deb2f..967c73f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -350,6 +350,9 @@ pub struct Config { /// Maximum numbers of packets sent in a batch. send_batch_size: usize, + /// Buffer size for early incoming zero rtt packets, in packets. + zerortt_buffer_size: usize, + /// Configurations about loss recovery, congestion control, and pmtu discovery. recovery: RecoveryConfig, @@ -403,6 +406,7 @@ impl Config { cid_len: 8, anti_amplification_factor: ANTI_AMPLIFICATION_FACTOR, send_batch_size: 64, + zerortt_buffer_size: 1000, recovery: RecoveryConfig::default(), multipath: MultipathConfig::default(), tls_config_selector: None, @@ -636,6 +640,12 @@ impl Config { self.send_batch_size = cmp::max(v, 1); } + /// Set the buffer size for disordered zerortt packets on the server. + /// Applicable to Server only. + pub fn set_zerortt_buffer_size(&mut self, v: usize) { + self.zerortt_buffer_size = v; + } + /// Set TLS config. pub fn set_tls_config(&mut self, tls_config: tls::TlsConfig) { self.set_tls_config_selector(Arc::new(tls::DefaultTlsConfigSelector { diff --git a/tools/src/bin/tquic_server.rs b/tools/src/bin/tquic_server.rs index c1740595..f5f7beff 100644 --- a/tools/src/bin/tquic_server.rs +++ b/tools/src/bin/tquic_server.rs @@ -234,6 +234,15 @@ pub struct ServerOpt { /// Batch size for sending packets. #[clap(long, default_value = "16", value_name = "NUM", help_heading = "Misc")] pub send_batch_size: usize, + + /// buffer size for disordered zerortt packets on the server. + #[clap( + long, + default_value = "1000", + value_name = "NUM", + help_heading = "Misc" + )] + pub zerortt_buffer_size: usize, } const MAX_BUF_SIZE: usize = 65536; @@ -269,6 +278,7 @@ impl Server { config.set_cid_len(option.cid_len); config.set_anti_amplification_factor(option.anti_amplification_factor); config.set_send_batch_size(option.send_batch_size); + config.set_zerortt_buffer_size(option.zerortt_buffer_size); config.set_congestion_control_algorithm(option.congestion_control_algor); config.set_initial_congestion_window(option.initial_congestion_window); config.set_min_congestion_window(option.min_congestion_window);