Skip to content

Commit

Permalink
WIP: first success stream video,audio with whip and whep over cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Apr 22, 2024
1 parent 41206fe commit fd3ef93
Show file tree
Hide file tree
Showing 20 changed files with 844 additions and 211 deletions.
13 changes: 9 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use atm0s_sdn::{
ControllerPlaneCfg, DataPlaneCfg, DataWorkerHistory, SdnExtOut, SdnWorkerBusEvent,
};
use media_server_protocol::transport::{RpcReq, RpcRes};
use media_server_runner::{Input as WorkerInput, MediaConfig, MediaServerWorker, Output as WorkerOutput, Owner, SdnConfig, SC, SE, TC, TW};
use media_server_runner::{Input as WorkerInput, MediaConfig, MediaServerWorker, Output as WorkerOutput, Owner, SdnConfig, UserData, SC, SE, TC, TW};
use rand::rngs::OsRng;
use sans_io_runtime::{BusChannelControl, BusControl, WorkerInner, WorkerInnerInput, WorkerInnerOutput};
use sans_io_runtime::{BusChannelControl, BusControl, BusEvent, WorkerInner, WorkerInnerInput, WorkerInnerOutput};

use crate::NodeConfig;

Expand All @@ -20,15 +20,15 @@ pub enum ExtIn {
#[derive(Debug, Clone)]
pub enum ExtOut {
Rpc(u64, u16, RpcRes<usize>),
Sdn(SdnExtOut<SE>),
Sdn(SdnExtOut<UserData, SE>),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Channel {
Controller,
Worker(u16),
}
type Event = SdnWorkerBusEvent<SC, SE, TC, TW>;
type Event = SdnWorkerBusEvent<UserData, SC, SE, TC, TW>;
pub struct ICfg {
pub controller: bool,
pub node: NodeConfig,
Expand Down Expand Up @@ -137,9 +137,10 @@ impl MediaRuntimeWorker {

fn convert_input<'a>(input: Input<'a>) -> WorkerInput<'a> {
match input {
Input::Bus(event) => {
todo!()
}
Input::Bus(event) => match event {
BusEvent::Broadcast(_from, msg) => WorkerInput::Bus(msg),
BusEvent::Channel(_owner, _channel, msg) => WorkerInput::Bus(msg),
},
Input::Ext(ext) => match ext {
ExtIn::Rpc(req_id, ext) => WorkerInput::ExtRpc(req_id, ext),
},
Expand Down
2 changes: 2 additions & 0 deletions packages/media_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ edition = "2021"
log = { workspace = true }
num_enum = { workspace = true }
smallmap = { workspace = true }
derivative = { workspace = true }
derive_more = { workspace = true }
sans-io-runtime = { workspace = true, default-features = false }
atm0s-sdn = { workspace = true }
media-server-protocol = { path = "../protocol" }
Expand Down
87 changes: 57 additions & 30 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
use std::{fmt::Debug, hash::Hash, time::Instant};
use derive_more::{AsRef, Display, From};
use sans_io_runtime::{Task, TaskGroup};
use std::{
collections::HashMap,
fmt::Debug,
hash::{Hash, Hasher},
time::Instant,
};

use atm0s_sdn::features::{FeaturesControl, FeaturesEvent};
use media_server_protocol::{
endpoint::{PeerId, RoomId, TrackMeta, TrackName},
media::MediaPacket,
};
use media_server_utils::Small2dMap;

use crate::transport::{LocalTrackId, RemoteTrackId};

use self::room::ClusterRoom;

mod room;

#[derive(Clone, Copy, From, AsRef, PartialEq, Eq, Debug, Display, Hash)]
pub struct ClusterRoomHash(pub u64);

impl From<&RoomId> for ClusterRoomHash {
fn from(room: &RoomId) -> Self {
let mut hash = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut hash);
Self(hash.finish())
}
}

#[derive(Debug, Clone)]
pub enum ClusterRemoteTrackControl {
Started(TrackName),
Started(TrackName, TrackMeta),
Media(MediaPacket),
Ended,
}
Expand All @@ -39,68 +58,76 @@ pub enum ClusterLocalTrackEvent {

#[derive(Debug)]
pub enum ClusterEndpointControl {
JoinRoom(RoomId, PeerId),
LeaveRoom,
SubscribeRoom,
UnsubscribeRoom,
SubscribePeer(PeerId),
UnsubscribePeer(PeerId),
Join(PeerId),
Leave,
RemoteTrack(RemoteTrackId, ClusterRemoteTrackControl),
LocalTrack(LocalTrackId, ClusterLocalTrackControl),
}

#[derive(Clone)]
pub enum ClusterEndpointEvent {
PeerJoined(PeerId),
PeerLeaved(PeerId),
TrackStarted(PeerId, TrackName, TrackMeta),
TrackStoped(PeerId, TrackName),

Check warning on line 70 in packages/media_core/src/cluster.rs

View workflow job for this annotation

GitHub Actions / typos

"Stoped" should be "Stopped".
RemoteTrack(RemoteTrackId, ClusterRemoteTrackEvent),
LocalTrack(LocalTrackId, ClusterLocalTrackEvent),
}

pub enum Input<Owner> {
Sdn(FeaturesEvent),
Endpoint(Owner, ClusterEndpointControl),
Sdn(ClusterRoomHash, FeaturesEvent),
Endpoint(Owner, ClusterRoomHash, ClusterEndpointControl),
}

pub enum Output<Owner> {
Sdn(FeaturesControl),
Sdn(ClusterRoomHash, FeaturesControl),
Endpoint(Vec<Owner>, ClusterEndpointEvent),
Continue,
}

pub struct MediaCluster<Owner> {
endpoints: Small2dMap<Owner, RoomId>,
pub struct MediaCluster<Owner: Debug + Copy + Clone + Hash + Eq> {
rooms_map: HashMap<ClusterRoomHash, usize>,
rooms: TaskGroup<room::Input<Owner>, Output<Owner>, ClusterRoom<Owner>, 128>,
}

impl<Owner: Hash + Eq + Clone> Default for MediaCluster<Owner> {
impl<Owner: Debug + Copy + Hash + Eq + Clone> Default for MediaCluster<Owner> {
fn default() -> Self {
Self { endpoints: Small2dMap::default() }
Self {
rooms_map: HashMap::new(),
rooms: TaskGroup::default(),
}
}
}

impl<Owner: Debug> MediaCluster<Owner> {
impl<Owner: Debug + Hash + Copy + Clone + Debug + Eq> MediaCluster<Owner> {
pub fn on_tick(&mut self, now: Instant) -> Option<Output<Owner>> {
//TODO
None
let (_index, out) = self.rooms.on_tick(now)?;
Some(out)
}

pub fn on_sdn_event(&mut self, now: Instant, event: FeaturesEvent) -> Option<Output<Owner>> {
None
pub fn on_sdn_event(&mut self, now: Instant, room: ClusterRoomHash, event: FeaturesEvent) -> Option<Output<Owner>> {
let index = self.rooms_map.get(&room)?;
self.rooms.on_event(now, *index, room::Input::Sdn(event))
}

pub fn on_endpoint_control(&mut self, now: Instant, owner: Owner, control: ClusterEndpointControl) -> Option<Output<Owner>> {
log::info!("[MediaCluster] {:?} control {:?}", owner, control);
None
pub fn on_endpoint_control(&mut self, now: Instant, owner: Owner, room_hash: ClusterRoomHash, control: ClusterEndpointControl) -> Option<Output<Owner>> {
if let Some(index) = self.rooms_map.get(&room_hash) {
self.rooms.on_event(now, *index, room::Input::Endpoint(owner, control))
} else {
log::info!("[MediaCluster] create room {}", room_hash);
let mut room = ClusterRoom::new(room_hash);
let out = room.on_event(now, room::Input::Endpoint(owner, control));
let index = self.rooms.add_task(room);
self.rooms_map.insert(room_hash, index);
out
}
}

pub fn pop_output(&mut self, now: Instant) -> Option<Output<Owner>> {
//TODO
None
let (_index, out) = self.rooms.pop_output(now)?;
Some(out)
}

pub fn shutdown<'a>(&mut self, now: Instant) -> Option<Output<Owner>> {
//TODO
None
let (_index, out) = self.rooms.shutdown(now)?;
Some(out)
}
}
Loading

0 comments on commit fd3ef93

Please sign in to comment.