Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix T3RTX timer starvation #7

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/association/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1685,7 +1685,8 @@ impl Association {
self.timers.stop(Timer::T3RTX);
} else {
trace!("[{}] T3-rtx timer start (pt2)", self.side);
self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto());
self.timers
.restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
}

// Update congestion control parameters
Expand Down Expand Up @@ -1825,7 +1826,8 @@ impl Association {
if !self.inflight_queue.is_empty() {
// Start timer. (noop if already started)
trace!("[{}] T3-rtx timer start (pt3)", self.side);
self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto());
self.timers
.restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
} else if state == AssociationState::ShutdownPending {
// No more outstanding, send shutdown.
should_awake_write_loop = true;
Expand Down Expand Up @@ -2053,7 +2055,8 @@ impl Association {
if !chunks.is_empty() {
// Start timer. (noop if already started)
trace!("[{}] T3-rtx timer start (pt1)", self.side);
self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto());
self.timers
.restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());

for p in &self.bundle_data_chunks_into_packets(chunks) {
if let Ok(raw) = p.marshal() {
Expand Down
11 changes: 11 additions & 0 deletions src/association/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ impl TimerTable {
self.data[timer as usize] = Some(time);
}

/// Restarts the timer if the current instant is none or elapsed.
pub fn restart_if_stale(&mut self, timer: Timer, now: Instant, interval: u64) {
if let Some(current) = self.data[timer as usize] {
if current >= now {
return;
}
}

self.start(timer, now, interval);
}

pub fn stop(&mut self, timer: Timer) {
self.data[timer as usize] = None;
self.retrans[timer as usize] = 0;
Expand Down
70 changes: 70 additions & 0 deletions src/endpoint/endpoint_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2212,6 +2212,76 @@ fn test_association_handle_packet_before_init() -> Result<()> {
Ok(())
}

// This test reproduces an issue related to having regular messages (regular acks) which keep
// rescheduling the T3RTX timer before it can ever fire.
#[test]
fn test_old_rtx_on_regular_acks() -> Result<()> {
let si: u16 = 6;
let mut sbuf = vec![0u8; 1000];
for i in 0..sbuf.len() {
sbuf[i] = (i & 0xff) as u8;
}

let (mut pair, client_ch, server_ch) = create_association_pair(AckMode::Normal, 0)?;
pair.latency = Duration::from_millis(500);
establish_session_pair(&mut pair, client_ch, server_ch, si)?;

// Send 20 packet at a regular interval that is < RTO
for i in 0..20u32 {
println!("sending packet {}", i);
sbuf[0..4].copy_from_slice(&i.to_be_bytes());
let n = pair.client_stream(client_ch, si)?.write_sctp(
&Bytes::from(sbuf.clone()),
PayloadProtocolIdentifier::Binary,
)?;
assert_eq!(sbuf.len(), n, "unexpected length of received data");
pair.client.drive(pair.time, pair.server.addr);

// drop a few transmits
if i >= 5 && i < 10 {
pair.client.outbound.clear();
}

pair.drive_client();
pair.drive_server();
pair.time += Duration::from_millis(500);
}

pair.drive_client();
pair.drive_server();

let mut buf = vec![0u8; 3000];

// All packets must readable correctly
for i in 0..20 {
{
let q = &pair
.server_conn_mut(server_ch)
.streams
.get(&si)
.unwrap()
.reassembly_queue;
println!("q.is_readable()={}", q.is_readable());
assert!(q.is_readable(), "should be readable at {}", i);
}

let chunks = pair.server_stream(server_ch, si)?.read_sctp()?.unwrap();
let (n, ppi) = (chunks.len(), chunks.ppi);
chunks.read(&mut buf)?;
assert_eq!(n, sbuf.len(), "unexpected length of received data");
assert_eq!(ppi, PayloadProtocolIdentifier::Binary, "unexpected ppi");
assert_eq!(
u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]),
i,
"unexpected received data"
);
}

close_association_pair(&mut pair, client_ch, server_ch, si);

Ok(())
}

/*
TODO: The following tests will be moved to sctp-async tests:
struct FakeEchoConn {
Expand Down
Loading