Skip to content

Commit

Permalink
Fix some congestion control related issues. (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
golf-gl authored Nov 27, 2023
1 parent fd20849 commit bff6aec
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 28 deletions.
9 changes: 5 additions & 4 deletions src/congestion_control/bbr/bbr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Default for BbrConfig {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(Duration::from_millis(1)),
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
}
}
Expand Down Expand Up @@ -296,22 +296,23 @@ pub struct Bbr {
impl Bbr {
pub fn new(config: BbrConfig) -> Self {
let now = Instant::now();
let initial_cwnd = config.initial_cwnd;

let mut bbr = Self {
config,
stats: Default::default(),
state: BbrStateMachine::Startup,
pacing_rate: 0,
send_quantum: 0,
cwnd: 0,
cwnd: initial_cwnd,
btlbw: 0,
btlbwfilter: MinMax::new(BTLBW_FILTER_LEN),
delivery_rate_estimator: DeliveryRateEstimator::default(),
rtprop: Duration::MAX,
rtprop_stamp: now,
is_rtprop_expired: false,
pacing_gain: 0.0,
cwnd_gain: 0.0,
pacing_gain: HIGH_GAIN,
cwnd_gain: HIGH_GAIN,
round: Default::default(),
full_pipe: Default::default(),
probe_rtt_done_stamp: None,
Expand Down
35 changes: 23 additions & 12 deletions src/congestion_control/bbr3/bbr3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Default for Bbr3Config {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(Duration::from_millis(1)),
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
full_bw_count_threshold: FULL_BW_COUNT_THRESHOLD,
full_bw_growth_rate: FULL_BW_GROWTH_RATE,
Expand Down Expand Up @@ -546,8 +546,9 @@ pub struct Bbr3 {
impl Bbr3 {
pub fn new(config: Bbr3Config) -> Self {
let now = Instant::now();
let initial_cwnd = config.initial_cwnd;

let mut bbr2 = Self {
let mut bbr3 = Self {
config,

stats: Default::default(),
Expand All @@ -556,11 +557,11 @@ impl Bbr3 {

send_quantum: 0,

cwnd: 0,
cwnd: initial_cwnd,

pacing_gain: 0.0,
pacing_gain: 2.77,

cwnd_gain: 0.0,
cwnd_gain: 2.0,

packet_conservation: false,

Expand Down Expand Up @@ -654,9 +655,9 @@ impl Bbr3 {

recovery_epoch_start: Some(now),
};
bbr2.init();
bbr3.init();

bbr2
bbr3
}

// Initialization Steps.
Expand All @@ -665,7 +666,10 @@ impl Bbr3 {
let now = Instant::now();

// init windowed max filter - max bw filter
self.min_rtt = self.config.initial_rtt.unwrap_or(Duration::MAX);
self.min_rtt = std::cmp::max(
self.config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
self.min_rtt_stamp = now;
self.probe_rtt_done_stamp = None;
self.probe_rtt_round_done = false;
Expand Down Expand Up @@ -1033,7 +1037,6 @@ impl Bbr3 {
self.ack_phase = AckProbePhase::Starting;
self.start_round();

self.init_full_pipe();
self.full_pipe.full_bw = self.delivery_rate_estimator.delivery_rate();

// Start wall clock.
Expand Down Expand Up @@ -1160,7 +1163,9 @@ impl Bbr3 {
self.bw_probe_up_acks = self
.bw_probe_up_acks
.saturating_sub(delta * self.bw_probe_up_cnt);
self.inflight_hi = self.inflight_hi.saturating_add(delta);
self.inflight_hi = self
.inflight_hi
.saturating_add(delta * self.config.max_datagram_size);
}

if self.is_round_start() {
Expand All @@ -1187,7 +1192,7 @@ impl Bbr3 {

if !self.check_inflight_too_high(now) {
// Loss rate is safe. Adjust upper bounds upward.
if self.inflight_hi == u64::MAX || self.bw_hi == u64::MAX {
if self.inflight_hi == u64::MAX {
// no upper bounds to raise.
return;
}
Expand All @@ -1201,7 +1206,7 @@ impl Bbr3 {
}

if self.state == State::ProbeBwUp {
self.probe_inflight_hi_upward(false);
self.probe_inflight_hi_upward(true);
}
}
}
Expand Down Expand Up @@ -1855,6 +1860,8 @@ impl CongestionController for Bbr3 {
self.ack_state.last_ack_packet_sent_time = now;
self.ack_state.prior_bytes_in_flight = self.stats.bytes_in_flight;
self.ack_state.now = now;
self.ack_state.tx_in_flight = 0;
self.ack_state.lost = 0;
}

fn on_ack(
Expand Down Expand Up @@ -1942,6 +1949,10 @@ impl CongestionController for Bbr3 {
}
}
}

fn pacing_rate(&self) -> Option<u64> {
Some(self.pacing_rate)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/congestion_control/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub fn build_congestion_controller(conf: &RecoveryConfig) -> Box<dyn CongestionC
CongestionControlAlgorithm::Cubic => Box::new(Cubic::new(CubicConfig::new(
min_cwnd,
initial_cwnd,
Some(conf.initial_rtt),
max_datagram_size,
))),
CongestionControlAlgorithm::Bbr => Box::new(Bbr::new(BbrConfig::new(
Expand Down
25 changes: 18 additions & 7 deletions src/congestion_control/copa/copa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Default for CopaConfig {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(Duration::from_millis(1)),
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
slow_start_delta: COPA_DELTA,
steady_delta: COPA_DELTA,
Expand Down Expand Up @@ -282,14 +282,15 @@ pub struct Copa {
impl Copa {
pub fn new(config: CopaConfig) -> Self {
let slow_start_delta = config.slow_start_delta;
let initial_cwnd = config.initial_cwnd;

Self {
config,
stats: Default::default(),
init_time: Instant::now(),
mode: CompetingMode::Default,
slow_start: true,
cwnd: 0,
cwnd: initial_cwnd,
velocity: Velocity::default(),
delta: slow_start_delta,
standing_rtt_filter: MinMax::new(STANDING_RTT_FILTER_WINDOW.as_micros() as u64),
Expand Down Expand Up @@ -519,7 +520,7 @@ impl Copa {
self.update_mode();

let min_rtt = Duration::from_micros(self.min_rtt_filter.get());
let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get());
let standing_rtt = self.get_standing_rtt();

trace!(
"{}. round_min_rtt = {}us, elapsed = {}us, min_rtt = {}us, standing_rtt = {}us",
Expand All @@ -543,9 +544,7 @@ impl Copa {
min_rtt.as_micros()
);

if !standing_rtt.is_zero() {
self.target_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;
}
self.target_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;
} else {
// Limit queueing_delay in case it's too small and get a huge target rate.
self.target_rate = (self.config.max_datagram_size as f64
Expand Down Expand Up @@ -582,11 +581,23 @@ impl Copa {
self.cwnd
);
}

/// Get standing rtt.
fn get_standing_rtt(&self) -> Duration {
let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get());
if standing_rtt.is_zero() {
return std::cmp::max(
self.config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
}
standing_rtt
}
}

impl CongestionController for Copa {
fn pacing_rate(&self) -> Option<u64> {
let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get());
let standing_rtt = self.get_standing_rtt();
let current_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;

Some(PACING_GAIN * current_rate)
Expand Down
29 changes: 29 additions & 0 deletions src/congestion_control/cubic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ pub struct CubicConfig {

/// Enable fast convergence, default to true.
fast_convergence_enabled: bool,

/// Initial rtt.
initial_rtt: Option<Duration>,
}

impl CubicConfig {
pub fn new(
min_congestion_window: u64,
initial_congestion_window: u64,
initial_rtt: Option<Duration>,
max_datagram_size: u64,
) -> Self {
let c = C;
Expand All @@ -88,6 +92,7 @@ impl CubicConfig {
beta,
min_congestion_window,
initial_congestion_window,
initial_rtt,
max_datagram_size,
hystart_enabled: true,
fast_convergence_enabled: true,
Expand Down Expand Up @@ -144,6 +149,7 @@ impl Default for CubicConfig {
beta: BETA,
min_congestion_window: 2 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_congestion_window: 10 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
hystart_enabled: true,
fast_convergence_enabled: true,
Expand Down Expand Up @@ -192,11 +198,22 @@ pub struct Cubic {

/// Congestion statistics.
stats: CongestionStats,

/// Pacing rate
pacing_rate: u64,

/// Initial rtt.
initial_rtt: Duration,
}

impl Cubic {
pub fn new(config: CubicConfig) -> Self {
let initial_cwnd = config.initial_congestion_window;
let initial_rtt = std::cmp::max(
config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
let pacing_rate = (initial_cwnd as f64 / initial_rtt.as_secs_f64()) as u64;
let hystart_enabled = config.hystart_enabled;
let alpha = 3.0 * (1.0 - config.beta) / (1.0 + config.beta);
Self {
Expand All @@ -212,6 +229,8 @@ impl Cubic {
recovery_epoch_start: None,
last_sent_time: None,
stats: Default::default(),
pacing_rate,
initial_rtt,
}
}

Expand Down Expand Up @@ -381,6 +400,12 @@ impl CongestionController for Cubic {
self.cwnd_inc / self.config.max_datagram_size * self.config.max_datagram_size;
self.cwnd_inc %= self.config.max_datagram_size;
}

self.pacing_rate = if rtt.smoothed_rtt().is_zero() {
(self.cwnd as u128 * 1_000_000 / self.initial_rtt.as_micros()) as u64
} else {
(self.cwnd as u128 * 1_000_000 / rtt.smoothed_rtt().as_micros()) as u64
};
}

fn end_ack(&mut self) {
Expand Down Expand Up @@ -487,6 +512,10 @@ impl CongestionController for Cubic {
fn stats(&self) -> &CongestionStats {
&self.stats
}

fn pacing_rate(&self) -> Option<u64> {
Some(self.pacing_rate)
}
}

#[cfg(test)]
Expand Down
15 changes: 10 additions & 5 deletions src/connection/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ impl Recovery {
self.congestion
.on_sent(now, &mut pkt, self.bytes_in_flight as u64);
trace!(
"{} {} ON_SENT {:?} inflight={} cwnd={}",
"now={:?} {} {} ON_SENT {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
pkt,
Expand Down Expand Up @@ -189,7 +190,8 @@ impl Recovery {

self.congestion.begin_ack(now, self.bytes_in_flight as u64);
trace!(
"{} {} BEGIN_ACK inflight={} cwnd={}",
"now={:?} {} {} BEGIN_ACK inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
self.bytes_in_flight,
Expand Down Expand Up @@ -292,7 +294,8 @@ impl Recovery {
);

trace!(
"{} {} ON_ACK {:?} inflight={} cwnd={}",
"now={:?} {} {} ON_ACK {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
sent_pkt,
Expand Down Expand Up @@ -381,7 +384,8 @@ impl Recovery {
}
latest_lost_packet = Some(unacked.clone());
trace!(
"{} {} ON_LOST {:?} inflight={} cwnd={}",
"now={:?} {} {} ON_LOST {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
unacked,
Expand All @@ -407,7 +411,8 @@ impl Recovery {
self.bytes_in_flight as u64,
);
trace!(
"{} {} ON_CONGESTION_EVENT lost_size={} inflight={} cwnd={}",
"now={:?} {} {} ON_CONGESTION_EVENT lost_size={} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
lost_bytes,
Expand Down

0 comments on commit bff6aec

Please sign in to comment.