Skip to content

Commit

Permalink
feat: bitrate control with Twcc and Remb
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Apr 26, 2024
1 parent 829ea8b commit 000142d
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 13 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
node: node.clone(),
media: MediaConfig { webrtc_addrs: webrtc_addrs.clone() },
};
controller.add_worker::<_, _, MediaRuntimeWorker, PollingBackend<_, 128, 512>>(Duration::from_millis(100), cfg, None);
controller.add_worker::<_, _, MediaRuntimeWorker, PollingBackend<_, 128, 512>>(Duration::from_millis(1), cfg, None);
}

let mut req_id_seed = 0;
Expand Down
6 changes: 6 additions & 0 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub enum EndpointRes {
/// This is used for controlling the local track, which is sent from endpoint
pub enum EndpointLocalTrackEvent {
Media(MediaPacket),
DesiredBitrate(u64),
}

/// This is used for controlling the remote track, which is sent from endpoint
Expand All @@ -83,6 +84,11 @@ pub enum EndpointEvent {
PeerTrackStopped(PeerId, TrackName),
RemoteMediaTrack(RemoteTrackId, EndpointRemoteTrackEvent),
LocalMediaTrack(LocalTrackId, EndpointLocalTrackEvent),
/// Egress est params
BweConfig {
current: u64,
desired: u64,
},
/// This session will be disconnect after some seconds
GoAway(u8, Option<String>),
}
Expand Down
11 changes: 11 additions & 0 deletions packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ impl EndpointInternal {
TransportEvent::RemoteTrack(track, event) => self.on_transport_remote_track(now, track, event),
TransportEvent::LocalTrack(track, event) => self.on_transport_local_track(now, track, event),
TransportEvent::Stats(stats) => self.on_transport_stats(now, stats),
TransportEvent::EgressBitrateEstimate(bitrate) => {
//TODO implement bitrate allocator
const INDEX: usize = 1;
let out = self.local_tracks.on_event(now, INDEX, local_track::Input::LimitBitrate(bitrate))?;
let track = self.local_tracks_id.get2(&INDEX)?;
self.convert_local_track_output(now, *track, out)
}
}
}

Expand Down Expand Up @@ -333,6 +340,10 @@ impl EndpointInternal {
local_track::Output::Event(event) => Some(InternalOutput::Event(EndpointEvent::LocalMediaTrack(id, event))),
local_track::Output::Cluster(room, control) => Some(InternalOutput::Cluster(room, ClusterEndpointControl::LocalTrack(id, control))),
local_track::Output::RpcRes(req_id, res) => Some(InternalOutput::RpcRes(req_id, EndpointRes::LocalTrack(id, res))),
local_track::Output::DesiredBitrate(bitrate) => Some(InternalOutput::Event(EndpointEvent::BweConfig {
current: bitrate,
desired: bitrate + 100_000.max(bitrate * 1 / 5),
})),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions packages/media_core/src/endpoint/internal/local_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ pub enum Input {
Cluster(ClusterLocalTrackEvent),
Event(LocalTrackEvent),
RpcReq(EndpointReqId, EndpointLocalTrackReq),
LimitBitrate(u64),
}

pub enum Output {
Event(EndpointLocalTrackEvent),
Cluster(ClusterRoomHash, ClusterLocalTrackControl),
RpcRes(EndpointReqId, EndpointLocalTrackRes),
DesiredBitrate(u64),
}

pub struct EndpointLocalTrack {
Expand Down Expand Up @@ -135,6 +137,11 @@ impl Task<Input, Output> for EndpointLocalTrack {
Input::Cluster(event) => self.on_cluster_event(now, event),
Input::Event(event) => self.on_transport_event(now, event),
Input::RpcReq(req_id, req) => self.on_rpc_req(now, req_id, req),
Input::LimitBitrate(bitrate) => {
log::debug!("[EndpointLocalTrack] Limit send bitrate {bitrate}");
self.queue.push_back(Output::DesiredBitrate(bitrate));
Some(Output::Cluster(self.room?, ClusterLocalTrackControl::DesiredBitrate(bitrate)))
}
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/media_core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub enum TransportEvent {
RemoteTrack(RemoteTrackId, RemoteTrackEvent),
LocalTrack(LocalTrackId, LocalTrackEvent),
Stats(TransportStats),
EgressBitrateEstimate(u64),
}

/// This is control message from endpoint
Expand Down
2 changes: 1 addition & 1 deletion packages/transport_webrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ media-server-utils = { path = "../media_utils" }
media-server-protocol = { path = "../protocol" }
media-server-core = { path = "../media_core" }
num_enum = { workspace = true }
str0m = "0.5.0"
str0m = { git = "https://github.com/giangndm/str0m.git", branch = "fix-bwe-slow-increase-with-audio-first" }
smallmap = "1.4.2"
stun-rs = "0.1.8"
30 changes: 28 additions & 2 deletions packages/transport_webrtc/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{net::SocketAddr, ops::Deref, time::Instant};
use std::{
net::SocketAddr,
ops::Deref,
time::{Duration, Instant},
};

use media_server_core::{
endpoint::{EndpointEvent, EndpointReqId, EndpointRes},
Expand All @@ -23,6 +27,7 @@ use str0m::{

use crate::WebrtcError;

mod bwe_state;
mod whep;
mod whip;

Expand Down Expand Up @@ -50,6 +55,8 @@ enum InternalOutput<'a> {
Str0mKeyframe(Mid, KeyframeRequestKind),
Str0mLimitBitrate(Mid, u64),
Str0mSendMedia(Mid, MediaPacket),
Str0mBwe(u64, u64),
Str0mResetBwe(u64),
TransportOutput(TransportOutput<'a, ExtOut>),
}

Expand All @@ -72,7 +79,13 @@ pub struct TransportWebrtc {
impl TransportWebrtc {
pub fn new(variant: VariantParams, offer: &str, dtls_cert: DtlsCert, local_addrs: Vec<(SocketAddr, usize)>) -> RpcResult<(Self, String, String)> {
let offer = SdpOffer::from_sdp_string(offer).map_err(|_e| RpcError::new2(WebrtcError::SdpError))?;
let rtc_config = Rtc::builder().set_rtp_mode(true).set_ice_lite(true).set_dtls_cert(dtls_cert).set_local_ice_credentials(IceCreds::new());
let rtc_config = Rtc::builder()
.set_rtp_mode(true)
.set_ice_lite(true)
.set_dtls_cert(dtls_cert)
.set_local_ice_credentials(IceCreds::new())
.set_stats_interval(Some(Duration::from_secs(1)))
.enable_bwe(Some(Bitrate::kbps(3000)));
let ice_ufrag = rtc_config.local_ice_credentials().as_ref().expect("should have ice credentials").ufrag.clone();

let mut rtc = rtc_config.build();
Expand Down Expand Up @@ -108,9 +121,17 @@ impl TransportWebrtc {
self.pop_event(now)
}
InternalOutput::Str0mLimitBitrate(mid, bitrate) => {
log::debug!("Limit remote tracks with Remb {bitrate}");
self.rtc.direct_api().stream_rx_by_mid(mid, None)?.request_remb(Bitrate::bps(bitrate));
self.pop_event(now)
}
InternalOutput::Str0mBwe(current, desired) => {
log::debug!("Setting str0m bwe {current}, desired {desired}");
let mut bwe = self.rtc.bwe();
bwe.set_current_bitrate(current.into());
bwe.set_desired_bitrate(desired.into());
self.pop_event(now)
}
InternalOutput::Str0mSendMedia(mid, pkt) => {
log::trace!("[TransportWebrtc] sending media payload {} seq {} to mid {mid}", pkt.pt, pkt.seq);
self.rtc
Expand All @@ -120,6 +141,11 @@ impl TransportWebrtc {
.ok()?;
self.pop_event(now)
}
InternalOutput::Str0mResetBwe(init_bitrate) => {
log::info!("Reset str0m bwe to init_bitrate {init_bitrate} bps");
self.rtc.bwe().reset(init_bitrate.into());
self.pop_event(now)
}
InternalOutput::TransportOutput(out) => Some(out),
}
}
Expand Down
205 changes: 205 additions & 0 deletions packages/transport_webrtc/src/transport/bwe_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
const DEFAULT_BWE_BPS: u64 = 800_000; // in inatve or warm-up state we will used minimum DEFAULT_BWE_BPS
const DEFAULT_DESIRED_BPS: u64 = 1_000_000; // in inatve or warm-up state we will used minimum DEFAULT_DESIRED_BPS
const WARM_UP_FIRST_STAGE_MS: u128 = 1000;
const WARM_UP_MS: u128 = 2000;
const TIMEOUT_MS: u128 = 2000;
const MAX_BITRATE_BPS: u64 = 3_000_000;

use std::time::Instant;

/// BweState manage stage of Bwe for avoiding video stream stuck or slow start.
///
/// - It start with Inactive state, in this state all bwe = bwe.max(DEFAULT_BWE_BPS)
/// - In WarmUp state, it have 2 phase, each phase is 1 seconds.
/// After first phase, the Bwe will be reset with lastest_bwe.max(DEFAULT_BWE_BPS).
/// In this phase, bwe = bwe.max(DEFAULT_BWE_BPS). After WarmUp end it will be switched to Active
/// - In Active, bwe = bwe.min(MAX_BITRATE_BPS). If after TIMEOUT_MS, we dont have video packet, it will be reset to Inactive
///
/// In all state, bwe will have threshold MAX_BITRATE_BPS
///
#[derive(Default, Debug, PartialEq, Eq)]
pub enum BweState {
#[default]
Inactive,
WarmUp {
started_at: Instant,
last_video_pkt: Instant,
first_stage: bool,
last_bwe: Option<u64>,
},
Active {
last_video_pkt: Instant,
},
}

impl BweState {
/// Return Some(init_bitrate) if we need reset BWE
pub fn on_tick(&mut self, now: Instant) -> Option<u64> {
match self {
Self::Inactive => None,
Self::WarmUp {
started_at,
last_video_pkt,
first_stage,
last_bwe,
} => {
if now.duration_since(*last_video_pkt).as_millis() >= TIMEOUT_MS {
log::info!("[BweState] switched from WarmUp to Inactive after {:?} not received video pkt", now.duration_since(*last_video_pkt));
*self = Self::Inactive;
return None;
} else if now.duration_since(*started_at).as_millis() >= WARM_UP_MS {
log::info!("[BweState] switched from WarmUp to Active after {:?}", now.duration_since(*started_at));
*self = Self::Active { last_video_pkt: *last_video_pkt };
None
} else if *first_stage && now.duration_since(*started_at).as_millis() >= WARM_UP_FIRST_STAGE_MS {
let init_bitrate = last_bwe.unwrap_or(DEFAULT_BWE_BPS).max(DEFAULT_BWE_BPS);
log::info!("[BweState] WarmUp first_stage end after {:?} => reset Bwe({init_bitrate})", now.duration_since(*started_at));
*first_stage = false;
Some(init_bitrate)
} else {
None
}
}
Self::Active { last_video_pkt } => {
if now.duration_since(*last_video_pkt).as_millis() >= TIMEOUT_MS {
*self = Self::Inactive;
}
None
}
}
}

pub fn on_send_video(&mut self, now: Instant) {
match self {
Self::Inactive => {
log::info!("[BweState] switched from Inactive to WarmUp with first video packet");
*self = Self::WarmUp {
started_at: now,
last_video_pkt: now,
first_stage: true,
last_bwe: None,
}
}
Self::WarmUp { last_video_pkt, .. } | Self::Active { last_video_pkt } => {
*last_video_pkt = now;
}
}
}

pub fn filter_bwe(&mut self, bwe: u64) -> u64 {
match self {
Self::Inactive => {
log::debug!("[BweState] rewrite bwe {bwe} to {} with Inactive or WarmUp state", bwe.max(DEFAULT_BWE_BPS));
bwe.max(DEFAULT_BWE_BPS).min(MAX_BITRATE_BPS)
}
Self::WarmUp { last_bwe, .. } => {
log::debug!("[BweState] rewrite bwe {bwe} to {} with Inactive or WarmUp state", bwe.max(DEFAULT_BWE_BPS));
*last_bwe = Some(bwe);
bwe.max(DEFAULT_BWE_BPS).min(MAX_BITRATE_BPS)
}
Self::Active { .. } => bwe.min(MAX_BITRATE_BPS),
}
}

pub fn filter_bwe_config(&mut self, current: u64, desired: u64) -> (u64, u64) {
match self {
Self::Inactive | Self::WarmUp { .. } => {
log::debug!(
"[BweState] rewrite current {current}, desired {desired} to current {}, desired {} with Inactive or WarmUp state",
current.max(DEFAULT_BWE_BPS),
desired.max(DEFAULT_DESIRED_BPS)
);
(current.max(DEFAULT_BWE_BPS).min(MAX_BITRATE_BPS), desired.max(DEFAULT_DESIRED_BPS).min(MAX_BITRATE_BPS))
}
Self::Active { .. } => (current.min(MAX_BITRATE_BPS), desired.min(MAX_BITRATE_BPS)),
}
}
}

#[cfg(test)]
mod test {
use std::time::{Duration, Instant};

use crate::transport::bwe_state::{DEFAULT_BWE_BPS, DEFAULT_DESIRED_BPS, TIMEOUT_MS, WARM_UP_FIRST_STAGE_MS, WARM_UP_MS};

use super::BweState;

#[test]
fn inactive_state() {
let mut state = BweState::default();
assert_eq!(state, BweState::Inactive);
assert_eq!(state.on_tick(Instant::now()), None);

assert_eq!(state.filter_bwe(100), DEFAULT_BWE_BPS);
assert_eq!(state.filter_bwe_config(100, 200), (DEFAULT_BWE_BPS, DEFAULT_DESIRED_BPS));
}

#[test]
fn inactive_switch_to_warmup() {
let mut state = BweState::default();

let now = Instant::now();
state.on_send_video(now);
assert!(matches!(state, BweState::WarmUp { .. }));

assert_eq!(state.filter_bwe(100), DEFAULT_BWE_BPS);
assert_eq!(state.filter_bwe_config(100, 200), (DEFAULT_BWE_BPS, DEFAULT_DESIRED_BPS));

assert_eq!(state.filter_bwe(DEFAULT_BWE_BPS + 100), DEFAULT_BWE_BPS + 100);
assert_eq!(
state.filter_bwe_config(DEFAULT_BWE_BPS + 100, DEFAULT_DESIRED_BPS + 200),
(DEFAULT_BWE_BPS + 100, DEFAULT_DESIRED_BPS + 200)
);
}

#[test]
fn active_state() {
let now = Instant::now();
let mut state = BweState::Active { last_video_pkt: now };
assert_eq!(state.filter_bwe(100), 100);
assert_eq!(state.filter_bwe_config(100, 200), (100, 200));

assert_eq!(state.on_tick(now), None);
assert!(matches!(state, BweState::Active { .. }));

// after timeout without video packet => reset to Inactive
assert_eq!(state.on_tick(now + Duration::from_millis(TIMEOUT_MS as u64)), None);
assert!(matches!(state, BweState::Inactive));
}

#[test]
fn warmup_auto_switch_active() {
let now = Instant::now();
let mut state = BweState::WarmUp {
started_at: now,
last_video_pkt: now,
first_stage: true,
last_bwe: None,
};

assert_eq!(state.on_tick(now), None);
assert_eq!(state.on_tick(now + Duration::from_millis(WARM_UP_FIRST_STAGE_MS as u64)), Some(DEFAULT_BWE_BPS));

state.on_send_video(now + Duration::from_millis(100));

assert_eq!(state.on_tick(now + Duration::from_millis(WARM_UP_MS as u64)), None);
assert!(matches!(state, BweState::Active { .. }));
}

#[test]
fn warmup_auto_switch_inactive() {
let now = Instant::now();
let mut state = BweState::WarmUp {
started_at: now,
last_video_pkt: now,
first_stage: true,
last_bwe: None,
};

assert_eq!(state.on_tick(now), None);
assert_eq!(state.on_tick(now + Duration::from_millis(WARM_UP_FIRST_STAGE_MS as u64)), Some(DEFAULT_BWE_BPS));

assert_eq!(state.on_tick(now + Duration::from_millis(TIMEOUT_MS as u64)), None);
assert!(matches!(state, BweState::Inactive));
}
}
Loading

0 comments on commit 000142d

Please sign in to comment.