From 42aba09c18de145d364777179fcfd76bfb8c1ba6 Mon Sep 17 00:00:00 2001 From: Davide Bertola Date: Fri, 2 Feb 2024 11:24:31 +0100 Subject: [PATCH] Fix T3RTX timer starvation When recovering from temporary network loss, regular traffic can result in repeated acks, which keep re-scheduling T3RTX without it ever firing. This leads old-retransmissions never being fulfilled. The fix is to avoid re-scheduling an already valid timer. --- src/association/mod.rs | 9 +++-- src/association/timer.rs | 11 ++++++ src/endpoint/endpoint_test.rs | 70 +++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 3 deletions(-) diff --git a/src/association/mod.rs b/src/association/mod.rs index 4c54c5e..9119f46 100644 --- a/src/association/mod.rs +++ b/src/association/mod.rs @@ -1685,7 +1685,8 @@ impl Association { self.timers.stop(Timer::T3RTX); } else { trace!("[{}] T3-rtx timer start (pt2)", self.side); - self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto()); + self.timers + .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto()); } // Update congestion control parameters @@ -1825,7 +1826,8 @@ impl Association { if !self.inflight_queue.is_empty() { // Start timer. (noop if already started) trace!("[{}] T3-rtx timer start (pt3)", self.side); - self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto()); + self.timers + .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto()); } else if state == AssociationState::ShutdownPending { // No more outstanding, send shutdown. should_awake_write_loop = true; @@ -2053,7 +2055,8 @@ impl Association { if !chunks.is_empty() { // Start timer. (noop if already started) trace!("[{}] T3-rtx timer start (pt1)", self.side); - self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto()); + self.timers + .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto()); for p in &self.bundle_data_chunks_into_packets(chunks) { if let Ok(raw) = p.marshal() { diff --git a/src/association/timer.rs b/src/association/timer.rs index 8c504e5..6ee9fea 100644 --- a/src/association/timer.rs +++ b/src/association/timer.rs @@ -73,6 +73,17 @@ impl TimerTable { self.data[timer as usize] = Some(time); } + /// Restarts the timer if the current instant is none or elapsed. + pub fn restart_if_stale(&mut self, timer: Timer, now: Instant, interval: u64) { + if let Some(current) = self.data[timer as usize] { + if current >= now { + return; + } + } + + self.start(timer, now, interval); + } + pub fn stop(&mut self, timer: Timer) { self.data[timer as usize] = None; self.retrans[timer as usize] = 0; diff --git a/src/endpoint/endpoint_test.rs b/src/endpoint/endpoint_test.rs index 8ade959..2fdb610 100644 --- a/src/endpoint/endpoint_test.rs +++ b/src/endpoint/endpoint_test.rs @@ -2212,6 +2212,76 @@ fn test_association_handle_packet_before_init() -> Result<()> { Ok(()) } +// This test reproduces an issue related to having regular messages (regular acks) which keep +// rescheduling the T3RTX timer before it can ever fire. +#[test] +fn test_old_rtx_on_regular_acks() -> Result<()> { + let si: u16 = 6; + let mut sbuf = vec![0u8; 1000]; + for i in 0..sbuf.len() { + sbuf[i] = (i & 0xff) as u8; + } + + let (mut pair, client_ch, server_ch) = create_association_pair(AckMode::Normal, 0)?; + pair.latency = Duration::from_millis(500); + establish_session_pair(&mut pair, client_ch, server_ch, si)?; + + // Send 20 packet at a regular interval that is < RTO + for i in 0..20u32 { + println!("sending packet {}", i); + sbuf[0..4].copy_from_slice(&i.to_be_bytes()); + let n = pair.client_stream(client_ch, si)?.write_sctp( + &Bytes::from(sbuf.clone()), + PayloadProtocolIdentifier::Binary, + )?; + assert_eq!(sbuf.len(), n, "unexpected length of received data"); + pair.client.drive(pair.time, pair.server.addr); + + // drop a few transmits + if i >= 5 && i < 10 { + pair.client.outbound.clear(); + } + + pair.drive_client(); + pair.drive_server(); + pair.time += Duration::from_millis(500); + } + + pair.drive_client(); + pair.drive_server(); + + let mut buf = vec![0u8; 3000]; + + // All packets must readable correctly + for i in 0..20 { + { + let q = &pair + .server_conn_mut(server_ch) + .streams + .get(&si) + .unwrap() + .reassembly_queue; + println!("q.is_readable()={}", q.is_readable()); + assert!(q.is_readable(), "should be readable at {}", i); + } + + let chunks = pair.server_stream(server_ch, si)?.read_sctp()?.unwrap(); + let (n, ppi) = (chunks.len(), chunks.ppi); + chunks.read(&mut buf)?; + assert_eq!(n, sbuf.len(), "unexpected length of received data"); + assert_eq!(ppi, PayloadProtocolIdentifier::Binary, "unexpected ppi"); + assert_eq!( + u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]), + i, + "unexpected received data" + ); + } + + close_association_pair(&mut pair, client_ch, server_ch, si); + + Ok(()) +} + /* TODO: The following tests will be moved to sctp-async tests: struct FakeEchoConn {