Skip to content

Commit

Permalink
feat: bitrate allocator with both egress and ingress.
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Apr 27, 2024
1 parent ed8106a commit e6b218c
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 94 deletions.
4 changes: 2 additions & 2 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ enum TaskType {
}

pub struct EndpointCfg {
pub max_egress_bitrate: u32,
pub bitrate_control: BitrateControlMode,
pub max_egress_bitrate: u64,
pub max_ingress_bitrate: u64,
}

pub struct Endpoint<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> {
Expand Down
41 changes: 30 additions & 11 deletions packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub struct EndpointInternal {
impl EndpointInternal {
pub fn new(cfg: EndpointCfg) -> Self {
Self {
cfg,
state: TransportState::Connecting,
wait_join: None,
joined: None,
Expand All @@ -66,22 +65,34 @@ impl EndpointInternal {
_middlewares: Default::default(),
queue: Default::default(),
switcher: TaskSwitcher::new(2),
bitrate_allocator: BitrateAllocator::default(),
bitrate_allocator: BitrateAllocator::new(cfg.max_ingress_bitrate),
cfg,
}
}

pub fn on_tick<'a>(&mut self, now: Instant) -> Option<InternalOutput> {
self.bitrate_allocator.on_tick();
if let Some(out) = self.bitrate_allocator.pop_output() {
match out {
bitrate_allocator::Output::SetTrackBitrate(track, bitrate) => {
bitrate_allocator::Output::RemoteTrack(track, action) => {
if let Some(index) = self.remote_tracks_id.get1(&track) {
let out = self.remote_tracks.on_event(now, *index, remote_track::Input::BitrateAllocation(action))?;
if let Some(out) = self.convert_remote_track_output(now, track, out) {
return Some(out);
}
}
}
bitrate_allocator::Output::LocalTrack(track, action) => {
if let Some(index) = self.local_tracks_id.get1(&track) {
let out = self.local_tracks.on_event(now, *index, local_track::Input::LimitBitrate(bitrate))?;
let out = self.local_tracks.on_event(now, *index, local_track::Input::BitrateAllocation(action))?;
if let Some(out) = self.convert_local_track_output(now, track, out) {
return Some(out);
}
}
}
bitrate_allocator::Output::BweConfig(current, desired) => {
return Some(InternalOutput::Event(EndpointEvent::BweConfig { current, desired }));
}
}
}

Expand Down Expand Up @@ -146,7 +157,7 @@ impl EndpointInternal {
TransportEvent::EgressBitrateEstimate(bitrate) => {
let bitrate2 = bitrate.min(self.cfg.max_egress_bitrate as u64);
log::debug!("[EndpointInternal] limit egress bitrate {bitrate2}, rewrite from {bitrate}");
self.bitrate_allocator.set_egress_bitrate(bitrate2);
self.bitrate_allocator.set_egress_estimate(bitrate2);
None
}
}
Expand Down Expand Up @@ -349,6 +360,18 @@ impl EndpointInternal {
remote_track::Output::Event(event) => Some(InternalOutput::Event(EndpointEvent::RemoteMediaTrack(id, event))),
remote_track::Output::Cluster(room, control) => Some(InternalOutput::Cluster(room, ClusterEndpointControl::RemoteTrack(id, control))),
remote_track::Output::RpcRes(req_id, res) => Some(InternalOutput::RpcRes(req_id, EndpointRes::RemoteTrack(id, res))),
remote_track::Output::Started(kind, priority) => {
if kind.is_video() {
self.bitrate_allocator.set_ingress_video_track(id, priority);
}
None
}
remote_track::Output::Stopped(kind) => {
if kind.is_video() {
self.bitrate_allocator.del_ingress_video_track(id);
}
None
}
}
}

Expand All @@ -358,19 +381,15 @@ impl EndpointInternal {
local_track::Output::Event(event) => Some(InternalOutput::Event(EndpointEvent::LocalMediaTrack(id, event))),
local_track::Output::Cluster(room, control) => Some(InternalOutput::Cluster(room, ClusterEndpointControl::LocalTrack(id, control))),
local_track::Output::RpcRes(req_id, res) => Some(InternalOutput::RpcRes(req_id, EndpointRes::LocalTrack(id, res))),
local_track::Output::DesiredBitrate(bitrate) => Some(InternalOutput::Event(EndpointEvent::BweConfig {
current: bitrate,
desired: bitrate + 100_000.max(bitrate * 1 / 5),
})),
local_track::Output::Started(kind, priority) => {
if kind.is_video() {
self.bitrate_allocator.set_video_track(id, priority);
self.bitrate_allocator.set_egress_video_track(id, priority);
}
None
}
local_track::Output::Stopped(kind) => {
if kind.is_video() {
self.bitrate_allocator.del_video_track(id);
self.bitrate_allocator.del_egress_video_track(id);
}
None
}
Expand Down
101 changes: 40 additions & 61 deletions packages/media_core/src/endpoint/internal/bitrate_allocator.rs
Original file line number Diff line number Diff line change
@@ -1,90 +1,69 @@
use derivative::Derivative;
use std::collections::VecDeque;
use crate::transport::{LocalTrackId, RemoteTrackId};

use media_server_protocol::endpoint::TrackPriority;
use self::{egress::EgressBitrateAllocator, ingress::IngressBitrateAllocator};

use crate::transport::LocalTrackId;
mod egress;
mod ingress;

const DEFAULT_BITRATE_BPS: u64 = 800_000;
pub use egress::Action as EgressAction;
pub use ingress::Action as IngressAction;
use media_server_protocol::endpoint::TrackPriority;

#[derive(Debug, PartialEq, Eq)]
pub enum Output {
SetTrackBitrate(LocalTrackId, u64),
RemoteTrack(RemoteTrackId, IngressAction),
LocalTrack(LocalTrackId, EgressAction),
BweConfig(u64, u64),
}

#[derive(Derivative)]
#[derivative(Default)]
pub struct BitrateAllocator {
changed: bool,
#[derivative(Default(value = "DEFAULT_BITRATE_BPS"))]
egress_bitrate: u64,
tracks: smallmap::Map<LocalTrackId, TrackPriority>,
queue: VecDeque<Output>,
egress: EgressBitrateAllocator,
ingress: IngressBitrateAllocator,
}

impl BitrateAllocator {
pub fn on_tick(&mut self) {
self.process();
pub fn new(max_ingress_bitrate: u64) -> Self {
Self {
egress: Default::default(),
ingress: IngressBitrateAllocator::new(max_ingress_bitrate),
}
}

pub fn set_egress_bitrate(&mut self, bitrate: u64) {
self.egress_bitrate = bitrate;
self.changed = true;
pub fn on_tick(&mut self) {
self.egress.on_tick();
self.ingress.on_tick();
}

pub fn set_video_track(&mut self, track: LocalTrackId, priority: TrackPriority) {
self.tracks.insert(track, priority);
self.changed = true;
pub fn set_egress_estimate(&mut self, bitrate: u64) {
self.egress.set_egress_estimate(bitrate);
}

pub fn del_video_track(&mut self, track: LocalTrackId) {
self.tracks.remove(&track);
self.changed = true;
pub fn set_egress_video_track(&mut self, track: LocalTrackId, priority: TrackPriority) {
self.egress.set_video_track(track, priority);
}

pub fn pop_output(&mut self) -> Option<Output> {
self.queue.pop_front()
pub fn del_egress_video_track(&mut self, track: LocalTrackId) {
self.egress.del_video_track(track);
}

fn process(&mut self) {
if !self.changed {
return;
}
self.changed = false;
let mut sum = TrackPriority(0);
for (_track, priority) in self.tracks.iter() {
sum = sum + *priority;
}

if *(sum.as_ref()) != 0 {
for (track, priority) in self.tracks.iter() {
self.queue.push_back(Output::SetTrackBitrate(*track, (self.egress_bitrate * priority.0 as u64) / sum.0 as u64));
}
}
pub fn set_ingress_video_track(&mut self, track: RemoteTrackId, priority: TrackPriority) {
self.ingress.set_video_track(track, priority);
}
}

#[cfg(test)]
mod test {
use super::{BitrateAllocator, Output, DEFAULT_BITRATE_BPS};

#[test]
fn single_source() {
let mut allocator = BitrateAllocator::default();
allocator.set_video_track(0.into(), 1.into());

allocator.on_tick();
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(0.into(), DEFAULT_BITRATE_BPS)));
pub fn del_ingress_video_track(&mut self, track: RemoteTrackId) {
self.ingress.del_video_track(track);
}

#[test]
fn multi_source() {
let mut allocator = BitrateAllocator::default();
allocator.set_video_track(0.into(), 1.into());
allocator.set_video_track(1.into(), 3.into());
pub fn pop_output(&mut self) -> Option<Output> {
if let Some(out) = self.egress.pop_output() {
let out = match out {
egress::Output::Track(track, action) => Output::LocalTrack(track, action),
egress::Output::BweConfig(current, desired) => Output::BweConfig(current, desired),
};
return Some(out);
}

allocator.on_tick();
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(0.into(), DEFAULT_BITRATE_BPS * 1 / 4)));
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(1.into(), DEFAULT_BITRATE_BPS * 3 / 4)));
let (track, action) = self.ingress.pop_output()?;
Some(Output::RemoteTrack(track, action))
}
}
121 changes: 121 additions & 0 deletions packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use derivative::Derivative;
use std::collections::VecDeque;

use media_server_protocol::endpoint::TrackPriority;

use crate::transport::LocalTrackId;

const DEFAULT_BITRATE_BPS: u64 = 800_000;
const NO_TRACK_BWE_CURRENT: u64 = 100_000;
const NO_TRACK_BWE_DESIREND: u64 = 300_000;

#[derive(Debug, PartialEq, Eq)]
pub enum Action {
SetBitrate(u64),
}

#[derive(Debug, PartialEq, Eq)]
pub enum Output {
Track(LocalTrackId, Action),
BweConfig(u64, u64),
}

#[derive(Derivative)]
#[derivative(Default)]
pub struct EgressBitrateAllocator {
changed: bool,
#[derivative(Default(value = "DEFAULT_BITRATE_BPS"))]
egress_bitrate: u64,
tracks: smallmap::Map<LocalTrackId, TrackPriority>,
queue: VecDeque<Output>,
}

impl EgressBitrateAllocator {
pub fn on_tick(&mut self) {
self.process();
}

pub fn set_egress_estimate(&mut self, bitrate: u64) {
self.egress_bitrate = bitrate;
self.changed = true;
}

pub fn set_video_track(&mut self, track: LocalTrackId, priority: TrackPriority) {
self.tracks.insert(track, priority);
self.changed = true;
}

pub fn del_video_track(&mut self, track: LocalTrackId) {
self.tracks.remove(&track);
self.changed = true;
}

pub fn pop_output(&mut self) -> Option<Output> {
self.queue.pop_front()
}

fn process(&mut self) {
if !self.changed {
return;
}
self.changed = false;
let mut sum = TrackPriority(0);
for (_track, priority) in self.tracks.iter() {
sum = sum + *priority;
}

if *(sum.as_ref()) != 0 {
for (track, priority) in self.tracks.iter() {
self.queue
.push_back(Output::Track(*track, Action::SetBitrate((self.egress_bitrate * priority.0 as u64) / sum.0 as u64)));
}
}

if self.tracks.len() > 0 {
self.queue.push_back(Output::BweConfig(self.egress_bitrate, self.egress_bitrate * 6 / 5));
} else {
self.queue.push_back(Output::BweConfig(NO_TRACK_BWE_CURRENT, NO_TRACK_BWE_DESIREND));
}
}
}

#[cfg(test)]
mod test {
use crate::endpoint::internal::bitrate_allocator::egress::{EgressBitrateAllocator, NO_TRACK_BWE_CURRENT, NO_TRACK_BWE_DESIREND};

use super::{Action, Output, DEFAULT_BITRATE_BPS};

#[test]
fn no_source() {
let mut allocator = EgressBitrateAllocator::default();
allocator.set_egress_estimate(200_000);
allocator.on_tick();

assert_eq!(allocator.pop_output(), Some(Output::BweConfig(NO_TRACK_BWE_CURRENT, NO_TRACK_BWE_DESIREND)));
assert_eq!(allocator.pop_output(), None);
}

#[test]
fn single_source() {
let mut allocator = EgressBitrateAllocator::default();
allocator.set_video_track(0.into(), 1.into());

allocator.on_tick();
assert_eq!(allocator.pop_output(), Some(Output::Track(0.into(), Action::SetBitrate(DEFAULT_BITRATE_BPS))));
assert_eq!(allocator.pop_output(), Some(Output::BweConfig(DEFAULT_BITRATE_BPS, DEFAULT_BITRATE_BPS * 6 / 5)));
assert_eq!(allocator.pop_output(), None);
}

#[test]
fn multi_source() {
let mut allocator = EgressBitrateAllocator::default();
allocator.set_video_track(0.into(), 1.into());
allocator.set_video_track(1.into(), 3.into());

allocator.on_tick();
assert_eq!(allocator.pop_output(), Some(Output::Track(0.into(), Action::SetBitrate(DEFAULT_BITRATE_BPS * 1 / 4))));
assert_eq!(allocator.pop_output(), Some(Output::Track(1.into(), Action::SetBitrate(DEFAULT_BITRATE_BPS * 3 / 4))));
assert_eq!(allocator.pop_output(), Some(Output::BweConfig(DEFAULT_BITRATE_BPS, DEFAULT_BITRATE_BPS * 6 / 5)));
assert_eq!(allocator.pop_output(), None);
}
}
Loading

0 comments on commit e6b218c

Please sign in to comment.