Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some congestion control related issues. #61

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/congestion_control/bbr/bbr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Default for BbrConfig {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(Duration::from_millis(1)),
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
}
}
Expand Down Expand Up @@ -296,22 +296,23 @@ pub struct Bbr {
impl Bbr {
pub fn new(config: BbrConfig) -> Self {
let now = Instant::now();
let initial_cwnd = config.initial_cwnd;

let mut bbr = Self {
config,
stats: Default::default(),
state: BbrStateMachine::Startup,
pacing_rate: 0,
send_quantum: 0,
cwnd: 0,
cwnd: initial_cwnd,
btlbw: 0,
btlbwfilter: MinMax::new(BTLBW_FILTER_LEN),
delivery_rate_estimator: DeliveryRateEstimator::default(),
rtprop: Duration::MAX,
rtprop_stamp: now,
is_rtprop_expired: false,
pacing_gain: 0.0,
cwnd_gain: 0.0,
pacing_gain: HIGH_GAIN,
cwnd_gain: HIGH_GAIN,
round: Default::default(),
full_pipe: Default::default(),
probe_rtt_done_stamp: None,
Expand Down
35 changes: 23 additions & 12 deletions src/congestion_control/bbr3/bbr3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Default for Bbr3Config {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(Duration::from_millis(1)),
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
full_bw_count_threshold: FULL_BW_COUNT_THRESHOLD,
full_bw_growth_rate: FULL_BW_GROWTH_RATE,
Expand Down Expand Up @@ -546,8 +546,9 @@ pub struct Bbr3 {
impl Bbr3 {
pub fn new(config: Bbr3Config) -> Self {
let now = Instant::now();
let initial_cwnd = config.initial_cwnd;

let mut bbr2 = Self {
let mut bbr3 = Self {
config,

stats: Default::default(),
Expand All @@ -556,11 +557,11 @@ impl Bbr3 {

send_quantum: 0,

cwnd: 0,
cwnd: initial_cwnd,

pacing_gain: 0.0,
pacing_gain: 2.77,

cwnd_gain: 0.0,
cwnd_gain: 2.0,

packet_conservation: false,

Expand Down Expand Up @@ -654,9 +655,9 @@ impl Bbr3 {

recovery_epoch_start: Some(now),
};
bbr2.init();
bbr3.init();

bbr2
bbr3
}

// Initialization Steps.
Expand All @@ -665,7 +666,10 @@ impl Bbr3 {
let now = Instant::now();

// init windowed max filter - max bw filter
self.min_rtt = self.config.initial_rtt.unwrap_or(Duration::MAX);
self.min_rtt = std::cmp::max(
self.config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
self.min_rtt_stamp = now;
self.probe_rtt_done_stamp = None;
self.probe_rtt_round_done = false;
Expand Down Expand Up @@ -1033,7 +1037,6 @@ impl Bbr3 {
self.ack_phase = AckProbePhase::Starting;
self.start_round();

self.init_full_pipe();
self.full_pipe.full_bw = self.delivery_rate_estimator.delivery_rate();

// Start wall clock.
Expand Down Expand Up @@ -1160,7 +1163,9 @@ impl Bbr3 {
self.bw_probe_up_acks = self
.bw_probe_up_acks
.saturating_sub(delta * self.bw_probe_up_cnt);
self.inflight_hi = self.inflight_hi.saturating_add(delta);
self.inflight_hi = self
.inflight_hi
.saturating_add(delta * self.config.max_datagram_size);
}

if self.is_round_start() {
Expand All @@ -1187,7 +1192,7 @@ impl Bbr3 {

if !self.check_inflight_too_high(now) {
// Loss rate is safe. Adjust upper bounds upward.
if self.inflight_hi == u64::MAX || self.bw_hi == u64::MAX {
if self.inflight_hi == u64::MAX {
// no upper bounds to raise.
return;
}
Expand All @@ -1201,7 +1206,7 @@ impl Bbr3 {
}

if self.state == State::ProbeBwUp {
self.probe_inflight_hi_upward(false);
self.probe_inflight_hi_upward(true);
}
}
}
Expand Down Expand Up @@ -1855,6 +1860,8 @@ impl CongestionController for Bbr3 {
self.ack_state.last_ack_packet_sent_time = now;
self.ack_state.prior_bytes_in_flight = self.stats.bytes_in_flight;
self.ack_state.now = now;
self.ack_state.tx_in_flight = 0;
self.ack_state.lost = 0;
}

fn on_ack(
Expand Down Expand Up @@ -1942,6 +1949,10 @@ impl CongestionController for Bbr3 {
}
}
}

fn pacing_rate(&self) -> Option<u64> {
Some(self.pacing_rate)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/congestion_control/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub fn build_congestion_controller(conf: &RecoveryConfig) -> Box<dyn CongestionC
CongestionControlAlgorithm::Cubic => Box::new(Cubic::new(CubicConfig::new(
min_cwnd,
initial_cwnd,
Some(conf.initial_rtt),
max_datagram_size,
))),
CongestionControlAlgorithm::Bbr => Box::new(Bbr::new(BbrConfig::new(
Expand Down
25 changes: 18 additions & 7 deletions src/congestion_control/copa/copa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Default for CopaConfig {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(Duration::from_millis(1)),
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
slow_start_delta: COPA_DELTA,
steady_delta: COPA_DELTA,
Expand Down Expand Up @@ -282,14 +282,15 @@ pub struct Copa {
impl Copa {
pub fn new(config: CopaConfig) -> Self {
let slow_start_delta = config.slow_start_delta;
let initial_cwnd = config.initial_cwnd;

Self {
config,
stats: Default::default(),
init_time: Instant::now(),
mode: CompetingMode::Default,
slow_start: true,
cwnd: 0,
cwnd: initial_cwnd,
velocity: Velocity::default(),
delta: slow_start_delta,
standing_rtt_filter: MinMax::new(STANDING_RTT_FILTER_WINDOW.as_micros() as u64),
Expand Down Expand Up @@ -519,7 +520,7 @@ impl Copa {
self.update_mode();

let min_rtt = Duration::from_micros(self.min_rtt_filter.get());
let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get());
let standing_rtt = self.get_standing_rtt();

trace!(
"{}. round_min_rtt = {}us, elapsed = {}us, min_rtt = {}us, standing_rtt = {}us",
Expand All @@ -543,9 +544,7 @@ impl Copa {
min_rtt.as_micros()
);

if !standing_rtt.is_zero() {
self.target_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;
}
self.target_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;
} else {
// Limit queueing_delay in case it's too small and get a huge target rate.
self.target_rate = (self.config.max_datagram_size as f64
Expand Down Expand Up @@ -582,11 +581,23 @@ impl Copa {
self.cwnd
);
}

/// Get standing rtt.
fn get_standing_rtt(&self) -> Duration {
let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get());
if standing_rtt.is_zero() {
return std::cmp::max(
self.config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
}
standing_rtt
}
}

impl CongestionController for Copa {
fn pacing_rate(&self) -> Option<u64> {
let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get());
let standing_rtt = self.get_standing_rtt();
let current_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;

Some(PACING_GAIN * current_rate)
Expand Down
29 changes: 29 additions & 0 deletions src/congestion_control/cubic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ pub struct CubicConfig {

/// Enable fast convergence, default to true.
fast_convergence_enabled: bool,

/// Initial rtt.
initial_rtt: Option<Duration>,
}

impl CubicConfig {
pub fn new(
min_congestion_window: u64,
initial_congestion_window: u64,
initial_rtt: Option<Duration>,
max_datagram_size: u64,
) -> Self {
let c = C;
Expand All @@ -88,6 +92,7 @@ impl CubicConfig {
beta,
min_congestion_window,
initial_congestion_window,
initial_rtt,
max_datagram_size,
hystart_enabled: true,
fast_convergence_enabled: true,
Expand Down Expand Up @@ -144,6 +149,7 @@ impl Default for CubicConfig {
beta: BETA,
min_congestion_window: 2 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_congestion_window: 10 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
hystart_enabled: true,
fast_convergence_enabled: true,
Expand Down Expand Up @@ -192,11 +198,22 @@ pub struct Cubic {

/// Congestion statistics.
stats: CongestionStats,

/// Pacing rate
pacing_rate: u64,

/// Initial rtt.
initial_rtt: Duration,
}

impl Cubic {
pub fn new(config: CubicConfig) -> Self {
let initial_cwnd = config.initial_congestion_window;
let initial_rtt = std::cmp::max(
config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
let pacing_rate = (initial_cwnd as f64 / initial_rtt.as_secs_f64()) as u64;
let hystart_enabled = config.hystart_enabled;
let alpha = 3.0 * (1.0 - config.beta) / (1.0 + config.beta);
Self {
Expand All @@ -212,6 +229,8 @@ impl Cubic {
recovery_epoch_start: None,
last_sent_time: None,
stats: Default::default(),
pacing_rate,
initial_rtt,
}
}

Expand Down Expand Up @@ -381,6 +400,12 @@ impl CongestionController for Cubic {
self.cwnd_inc / self.config.max_datagram_size * self.config.max_datagram_size;
self.cwnd_inc %= self.config.max_datagram_size;
}

self.pacing_rate = if rtt.smoothed_rtt().is_zero() {
(self.cwnd as u128 * 1_000_000 / self.initial_rtt.as_micros()) as u64
} else {
(self.cwnd as u128 * 1_000_000 / rtt.smoothed_rtt().as_micros()) as u64
};
}

fn end_ack(&mut self) {
Expand Down Expand Up @@ -487,6 +512,10 @@ impl CongestionController for Cubic {
fn stats(&self) -> &CongestionStats {
&self.stats
}

fn pacing_rate(&self) -> Option<u64> {
Some(self.pacing_rate)
}
}

#[cfg(test)]
Expand Down
15 changes: 10 additions & 5 deletions src/connection/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ impl Recovery {
self.congestion
.on_sent(now, &mut pkt, self.bytes_in_flight as u64);
trace!(
"{} {} ON_SENT {:?} inflight={} cwnd={}",
"now={:?} {} {} ON_SENT {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
pkt,
Expand Down Expand Up @@ -189,7 +190,8 @@ impl Recovery {

self.congestion.begin_ack(now, self.bytes_in_flight as u64);
trace!(
"{} {} BEGIN_ACK inflight={} cwnd={}",
"now={:?} {} {} BEGIN_ACK inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
self.bytes_in_flight,
Expand Down Expand Up @@ -292,7 +294,8 @@ impl Recovery {
);

trace!(
"{} {} ON_ACK {:?} inflight={} cwnd={}",
"now={:?} {} {} ON_ACK {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
sent_pkt,
Expand Down Expand Up @@ -381,7 +384,8 @@ impl Recovery {
}
latest_lost_packet = Some(unacked.clone());
trace!(
"{} {} ON_LOST {:?} inflight={} cwnd={}",
"now={:?} {} {} ON_LOST {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
unacked,
Expand All @@ -407,7 +411,8 @@ impl Recovery {
self.bytes_in_flight as u64,
);
trace!(
"{} {} ON_CONGESTION_EVENT lost_size={} inflight={} cwnd={}",
"now={:?} {} {} ON_CONGESTION_EVENT lost_size={} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
lost_bytes,
Expand Down