From 869de383f04ee0bf14aa42498940d3e427f60487 Mon Sep 17 00:00:00 2001 From: golf-gl Date: Mon, 27 Nov 2023 16:06:19 +0800 Subject: [PATCH] Fix some congestion control related issues. --- src/congestion_control/bbr/bbr.rs | 9 ++--- src/congestion_control/bbr3/bbr3.rs | 35 +++++++++++++------- src/congestion_control/congestion_control.rs | 1 + src/congestion_control/copa/copa.rs | 25 ++++++++++---- src/congestion_control/cubic.rs | 29 ++++++++++++++++ src/connection/recovery.rs | 15 ++++++--- 6 files changed, 86 insertions(+), 28 deletions(-) diff --git a/src/congestion_control/bbr/bbr.rs b/src/congestion_control/bbr/bbr.rs index 6dedab737..6f8f5e81c 100755 --- a/src/congestion_control/bbr/bbr.rs +++ b/src/congestion_control/bbr/bbr.rs @@ -73,7 +73,7 @@ impl Default for BbrConfig { Self { min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, - initial_rtt: Some(Duration::from_millis(1)), + initial_rtt: Some(crate::INITIAL_RTT), max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, } } @@ -296,6 +296,7 @@ pub struct Bbr { impl Bbr { pub fn new(config: BbrConfig) -> Self { let now = Instant::now(); + let initial_cwnd = config.initial_cwnd; let mut bbr = Self { config, @@ -303,15 +304,15 @@ impl Bbr { state: BbrStateMachine::Startup, pacing_rate: 0, send_quantum: 0, - cwnd: 0, + cwnd: initial_cwnd, btlbw: 0, btlbwfilter: MinMax::new(BTLBW_FILTER_LEN), delivery_rate_estimator: DeliveryRateEstimator::default(), rtprop: Duration::MAX, rtprop_stamp: now, is_rtprop_expired: false, - pacing_gain: 0.0, - cwnd_gain: 0.0, + pacing_gain: HIGH_GAIN, + cwnd_gain: HIGH_GAIN, round: Default::default(), full_pipe: Default::default(), probe_rtt_done_stamp: None, diff --git a/src/congestion_control/bbr3/bbr3.rs b/src/congestion_control/bbr3/bbr3.rs index c361fc367..e5f738071 100644 --- a/src/congestion_control/bbr3/bbr3.rs +++ b/src/congestion_control/bbr3/bbr3.rs @@ -103,7 +103,7 @@ impl Default for Bbr3Config { Self { min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, - initial_rtt: Some(Duration::from_millis(1)), + initial_rtt: Some(crate::INITIAL_RTT), max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, full_bw_count_threshold: FULL_BW_COUNT_THRESHOLD, full_bw_growth_rate: FULL_BW_GROWTH_RATE, @@ -546,8 +546,9 @@ pub struct Bbr3 { impl Bbr3 { pub fn new(config: Bbr3Config) -> Self { let now = Instant::now(); + let initial_cwnd = config.initial_cwnd; - let mut bbr2 = Self { + let mut bbr3 = Self { config, stats: Default::default(), @@ -556,11 +557,11 @@ impl Bbr3 { send_quantum: 0, - cwnd: 0, + cwnd: initial_cwnd, - pacing_gain: 0.0, + pacing_gain: 2.77, - cwnd_gain: 0.0, + cwnd_gain: 2.0, packet_conservation: false, @@ -654,9 +655,9 @@ impl Bbr3 { recovery_epoch_start: Some(now), }; - bbr2.init(); + bbr3.init(); - bbr2 + bbr3 } // Initialization Steps. @@ -665,7 +666,10 @@ impl Bbr3 { let now = Instant::now(); // init windowed max filter - max bw filter - self.min_rtt = self.config.initial_rtt.unwrap_or(Duration::MAX); + self.min_rtt = std::cmp::max( + self.config.initial_rtt.unwrap_or(crate::INITIAL_RTT), + Duration::from_micros(1), + ); self.min_rtt_stamp = now; self.probe_rtt_done_stamp = None; self.probe_rtt_round_done = false; @@ -1033,7 +1037,6 @@ impl Bbr3 { self.ack_phase = AckProbePhase::Starting; self.start_round(); - self.init_full_pipe(); self.full_pipe.full_bw = self.delivery_rate_estimator.delivery_rate(); // Start wall clock. @@ -1160,7 +1163,9 @@ impl Bbr3 { self.bw_probe_up_acks = self .bw_probe_up_acks .saturating_sub(delta * self.bw_probe_up_cnt); - self.inflight_hi = self.inflight_hi.saturating_add(delta); + self.inflight_hi = self + .inflight_hi + .saturating_add(delta * self.config.max_datagram_size); } if self.is_round_start() { @@ -1187,7 +1192,7 @@ impl Bbr3 { if !self.check_inflight_too_high(now) { // Loss rate is safe. Adjust upper bounds upward. - if self.inflight_hi == u64::MAX || self.bw_hi == u64::MAX { + if self.inflight_hi == u64::MAX { // no upper bounds to raise. return; } @@ -1201,7 +1206,7 @@ impl Bbr3 { } if self.state == State::ProbeBwUp { - self.probe_inflight_hi_upward(false); + self.probe_inflight_hi_upward(true); } } } @@ -1855,6 +1860,8 @@ impl CongestionController for Bbr3 { self.ack_state.last_ack_packet_sent_time = now; self.ack_state.prior_bytes_in_flight = self.stats.bytes_in_flight; self.ack_state.now = now; + self.ack_state.tx_in_flight = 0; + self.ack_state.lost = 0; } fn on_ack( @@ -1942,6 +1949,10 @@ impl CongestionController for Bbr3 { } } } + + fn pacing_rate(&self) -> Option { + Some(self.pacing_rate) + } } #[cfg(test)] diff --git a/src/congestion_control/congestion_control.rs b/src/congestion_control/congestion_control.rs index 91741b2ab..ceb4d7a36 100644 --- a/src/congestion_control/congestion_control.rs +++ b/src/congestion_control/congestion_control.rs @@ -168,6 +168,7 @@ pub fn build_congestion_controller(conf: &RecoveryConfig) -> Box Box::new(Cubic::new(CubicConfig::new( min_cwnd, initial_cwnd, + Some(conf.initial_rtt), max_datagram_size, ))), CongestionControlAlgorithm::Bbr => Box::new(Bbr::new(BbrConfig::new( diff --git a/src/congestion_control/copa/copa.rs b/src/congestion_control/copa/copa.rs index a3ac89f77..e459d0814 100644 --- a/src/congestion_control/copa/copa.rs +++ b/src/congestion_control/copa/copa.rs @@ -109,7 +109,7 @@ impl Default for CopaConfig { Self { min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, - initial_rtt: Some(Duration::from_millis(1)), + initial_rtt: Some(crate::INITIAL_RTT), max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, slow_start_delta: COPA_DELTA, steady_delta: COPA_DELTA, @@ -282,6 +282,7 @@ pub struct Copa { impl Copa { pub fn new(config: CopaConfig) -> Self { let slow_start_delta = config.slow_start_delta; + let initial_cwnd = config.initial_cwnd; Self { config, @@ -289,7 +290,7 @@ impl Copa { init_time: Instant::now(), mode: CompetingMode::Default, slow_start: true, - cwnd: 0, + cwnd: initial_cwnd, velocity: Velocity::default(), delta: slow_start_delta, standing_rtt_filter: MinMax::new(STANDING_RTT_FILTER_WINDOW.as_micros() as u64), @@ -519,7 +520,7 @@ impl Copa { self.update_mode(); let min_rtt = Duration::from_micros(self.min_rtt_filter.get()); - let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get()); + let standing_rtt = self.get_standing_rtt(); trace!( "{}. round_min_rtt = {}us, elapsed = {}us, min_rtt = {}us, standing_rtt = {}us", @@ -543,9 +544,7 @@ impl Copa { min_rtt.as_micros() ); - if !standing_rtt.is_zero() { - self.target_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64; - } + self.target_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64; } else { // Limit queueing_delay in case it's too small and get a huge target rate. self.target_rate = (self.config.max_datagram_size as f64 @@ -582,11 +581,23 @@ impl Copa { self.cwnd ); } + + /// Get standing rtt. + fn get_standing_rtt(&self) -> Duration { + let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get()); + if standing_rtt.is_zero() { + return std::cmp::max( + self.config.initial_rtt.unwrap_or(crate::INITIAL_RTT), + Duration::from_micros(1), + ); + } + standing_rtt + } } impl CongestionController for Copa { fn pacing_rate(&self) -> Option { - let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get()); + let standing_rtt = self.get_standing_rtt(); let current_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64; Some(PACING_GAIN * current_rate) diff --git a/src/congestion_control/cubic.rs b/src/congestion_control/cubic.rs index 8aad9956a..ec9bf7623 100644 --- a/src/congestion_control/cubic.rs +++ b/src/congestion_control/cubic.rs @@ -73,12 +73,16 @@ pub struct CubicConfig { /// Enable fast convergence, default to true. fast_convergence_enabled: bool, + + /// Initial rtt. + initial_rtt: Option, } impl CubicConfig { pub fn new( min_congestion_window: u64, initial_congestion_window: u64, + initial_rtt: Option, max_datagram_size: u64, ) -> Self { let c = C; @@ -88,6 +92,7 @@ impl CubicConfig { beta, min_congestion_window, initial_congestion_window, + initial_rtt, max_datagram_size, hystart_enabled: true, fast_convergence_enabled: true, @@ -144,6 +149,7 @@ impl Default for CubicConfig { beta: BETA, min_congestion_window: 2 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, initial_congestion_window: 10 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, + initial_rtt: Some(crate::INITIAL_RTT), max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64, hystart_enabled: true, fast_convergence_enabled: true, @@ -192,11 +198,22 @@ pub struct Cubic { /// Congestion statistics. stats: CongestionStats, + + /// Pacing rate + pacing_rate: u64, + + /// Initial rtt. + initial_rtt: Duration, } impl Cubic { pub fn new(config: CubicConfig) -> Self { let initial_cwnd = config.initial_congestion_window; + let initial_rtt = std::cmp::max( + config.initial_rtt.unwrap_or(crate::INITIAL_RTT), + Duration::from_micros(1), + ); + let pacing_rate = (initial_cwnd as f64 / initial_rtt.as_secs_f64()) as u64; let hystart_enabled = config.hystart_enabled; let alpha = 3.0 * (1.0 - config.beta) / (1.0 + config.beta); Self { @@ -212,6 +229,8 @@ impl Cubic { recovery_epoch_start: None, last_sent_time: None, stats: Default::default(), + pacing_rate, + initial_rtt, } } @@ -381,6 +400,12 @@ impl CongestionController for Cubic { self.cwnd_inc / self.config.max_datagram_size * self.config.max_datagram_size; self.cwnd_inc %= self.config.max_datagram_size; } + + self.pacing_rate = if rtt.smoothed_rtt().is_zero() { + (self.cwnd as u128 * 1_000_000 / self.initial_rtt.as_micros()) as u64 + } else { + (self.cwnd as u128 * 1_000_000 / rtt.smoothed_rtt().as_micros()) as u64 + }; } fn end_ack(&mut self) { @@ -487,6 +512,10 @@ impl CongestionController for Cubic { fn stats(&self) -> &CongestionStats { &self.stats } + + fn pacing_rate(&self) -> Option { + Some(self.pacing_rate) + } } #[cfg(test)] diff --git a/src/connection/recovery.rs b/src/connection/recovery.rs index 85e7cdb15..8b25a8445 100644 --- a/src/connection/recovery.rs +++ b/src/connection/recovery.rs @@ -132,7 +132,8 @@ impl Recovery { self.congestion .on_sent(now, &mut pkt, self.bytes_in_flight as u64); trace!( - "{} {} ON_SENT {:?} inflight={} cwnd={}", + "now={:?} {} {} ON_SENT {:?} inflight={} cwnd={}", + now, self.trace_id, self.congestion.name(), pkt, @@ -189,7 +190,8 @@ impl Recovery { self.congestion.begin_ack(now, self.bytes_in_flight as u64); trace!( - "{} {} BEGIN_ACK inflight={} cwnd={}", + "now={:?} {} {} BEGIN_ACK inflight={} cwnd={}", + now, self.trace_id, self.congestion.name(), self.bytes_in_flight, @@ -292,7 +294,8 @@ impl Recovery { ); trace!( - "{} {} ON_ACK {:?} inflight={} cwnd={}", + "now={:?} {} {} ON_ACK {:?} inflight={} cwnd={}", + now, self.trace_id, self.congestion.name(), sent_pkt, @@ -381,7 +384,8 @@ impl Recovery { } latest_lost_packet = Some(unacked.clone()); trace!( - "{} {} ON_LOST {:?} inflight={} cwnd={}", + "now={:?} {} {} ON_LOST {:?} inflight={} cwnd={}", + now, self.trace_id, self.congestion.name(), unacked, @@ -407,7 +411,8 @@ impl Recovery { self.bytes_in_flight as u64, ); trace!( - "{} {} ON_CONGESTION_EVENT lost_size={} inflight={} cwnd={}", + "now={:?} {} {} ON_CONGESTION_EVENT lost_size={} inflight={} cwnd={}", + now, self.trace_id, self.congestion.name(), lost_bytes,