Skip to content

Commit

Permalink
WIP: working on simple cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Apr 21, 2024
1 parent f792f23 commit 41206fe
Show file tree
Hide file tree
Showing 21 changed files with 218 additions and 102 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ convert-enum = "0.1"
num_enum = "0.7"
log = "0.4"
smallmap = "1.4"
derivative = "2.2"
derive_more = "0.99"
rand = "0.8"
2 changes: 1 addition & 1 deletion bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ edition = "2021"
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
clap = { version = "4.5", features = ["env", "derive"] }
log = { workspace = true }
rand = { workspace = true }
poem = { version = "3.0", features = ["static-files"] }
poem-openapi = { version = "5.0", features = ["swagger-ui"] }
tokio = { version = "1.37", features = ["full"] }
sans-io-runtime = { workspace = true }
atm0s-sdn = { workspace = true }
media-server-protocol = { path = "../packages/protocol" }
media-server-runner = { path = "../packages/media_runner" }
rand = "0.8.5"
29 changes: 18 additions & 11 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::{marker::PhantomData, time::Instant};
use std::{fmt::Debug, hash::Hash, time::Instant};

use atm0s_sdn::features::{FeaturesControl, FeaturesEvent};
use media_server_protocol::{
endpoint::{PeerId, RoomId, TrackMeta, TrackName},
media::MediaPacket,
};
use media_server_utils::Small2dMap;

use crate::transport::{LocalTrackId, RemoteTrackId};

#[derive(Clone)]
mod room;

#[derive(Debug, Clone)]
pub enum ClusterRemoteTrackControl {
Started(TrackName),
Media(MediaPacket),
Expand All @@ -20,20 +23,21 @@ pub enum ClusterRemoteTrackEvent {
RequestKeyFrame,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum ClusterLocalTrackControl {
Subscribe(PeerId, TrackName),
RequestKeyFrame,
Unsubscribe,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum ClusterLocalTrackEvent {
Started,
Media(MediaPacket),
Ended,
}

#[derive(Debug)]
pub enum ClusterEndpointControl {
JoinRoom(RoomId, PeerId),
LeaveRoom,
Expand Down Expand Up @@ -65,25 +69,28 @@ pub enum Output<Owner> {
Endpoint(Vec<Owner>, ClusterEndpointEvent),
}

#[derive(Debug)]
pub struct MediaCluster<Owner> {
_tmp: PhantomData<Owner>,
endpoints: Small2dMap<Owner, RoomId>,
}

impl<Owner> Default for MediaCluster<Owner> {
impl<Owner: Hash + Eq + Clone> Default for MediaCluster<Owner> {
fn default() -> Self {
Self { _tmp: PhantomData }
Self { endpoints: Small2dMap::default() }
}
}

impl<Owner> MediaCluster<Owner> {
impl<Owner: Debug> MediaCluster<Owner> {
pub fn on_tick(&mut self, now: Instant) -> Option<Output<Owner>> {
//TODO
None
}

pub fn on_input(&mut self, now: Instant, input: Input<Owner>) -> Option<Output<Owner>> {
//TODO
pub fn on_sdn_event(&mut self, now: Instant, event: FeaturesEvent) -> Option<Output<Owner>> {
None
}

pub fn on_endpoint_control(&mut self, now: Instant, owner: Owner, control: ClusterEndpointControl) -> Option<Output<Owner>> {
log::info!("[MediaCluster] {:?} control {:?}", owner, control);
None
}

Expand Down
1 change: 1 addition & 0 deletions packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub struct ClusterRoom {}
9 changes: 7 additions & 2 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ pub enum EndpointLocalTrackRes {
}

pub struct EndpointReqId(pub u64);
impl From<u64> for EndpointReqId {
fn from(value: u64) -> Self {
Self(value)
}
}

/// This is control APIs, which is used to control server from Endpoint SDK
pub enum EndpointReq {
Expand Down Expand Up @@ -85,7 +90,7 @@ pub enum EndpointOutput<'a, Ext> {
Net(BackendOutgoing<'a>),
Cluster(ClusterEndpointControl),
Ext(Ext),
Shutdown,
Destroy,
}

#[derive(num_enum::TryFromPrimitive, num_enum::IntoPrimitive)]
Expand Down Expand Up @@ -189,7 +194,6 @@ impl<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> Endpoint<T, ExtIn, ExtOut> {
let out = self.internal.on_transport_rpc(now, req_id, req)?;
self.process_internal_output(now, out)
}
TransportOutput::Destroy => Some(EndpointOutput::Shutdown),
}
}

Expand All @@ -205,6 +209,7 @@ impl<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> Endpoint<T, ExtIn, ExtOut> {
self.process_transport_output(now, out)
}
InternalOutput::Cluster(control) => Some(EndpointOutput::Cluster(control)),
InternalOutput::Destroy => Some(EndpointOutput::Destroy),
}
}
}
74 changes: 48 additions & 26 deletions packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use std::{
collections::{HashMap, VecDeque},
time::Instant,
};
use std::{collections::VecDeque, time::Instant};

use media_server_protocol::endpoint::{PeerId, RoomId};

use crate::{
cluster::{ClusterEndpointControl, ClusterEndpointEvent, ClusterLocalTrackEvent, ClusterRemoteTrackEvent},
transport::{LocalTrackEvent, LocalTrackId, RemoteTrackEvent, RemoteTrackId, TransportEvent, TransportInput, TransportState, TransportStats},
transport::{LocalTrackEvent, LocalTrackId, RemoteTrackEvent, RemoteTrackId, TransportEvent, TransportState, TransportStats},
};

use self::{local_track::EndpointLocalTrack, remote_track::EndpointRemoteTrack};
Expand All @@ -21,6 +18,7 @@ pub enum InternalOutput {
Event(EndpointEvent),
RpcRes(EndpointReqId, EndpointRes),
Cluster(ClusterEndpointControl),
Destroy,
}

pub struct EndpointInternal {
Expand Down Expand Up @@ -73,17 +71,21 @@ impl EndpointInternal {
EndpointReq::JoinRoom(room, peer) => {
self.room = Some((room.clone(), peer.clone()));
if matches!(self.state, TransportState::Connecting) {
Some(InternalOutput::Cluster(ClusterEndpointControl::JoinRoom(room, peer)))
} else {
log::info!("[EndpointInternal] join_room({room}, {peer}) but in Connecting state => wait");
None
} else {
log::info!("[EndpointInternal] join_room({room}, {peer})");
Some(InternalOutput::Cluster(ClusterEndpointControl::JoinRoom(room, peer)))
}
}
EndpointReq::LeaveRoom => {
self.room.take()?;
let (room, peer) = self.room.take()?;
if matches!(self.state, TransportState::Connecting) {
Some(InternalOutput::Cluster(ClusterEndpointControl::LeaveRoom))
} else {
log::info!("[EndpointInternal] leave_room({room}, {peer}) but in Connecting state => only clear local");
None
} else {
log::info!("[EndpointInternal] leave_room({room}, {peer})");
Some(InternalOutput::Cluster(ClusterEndpointControl::LeaveRoom))
}
}
EndpointReq::RemoteTrack(track, control) => todo!(),
Expand All @@ -94,9 +96,16 @@ impl EndpointInternal {
fn on_transport_state_changed<'a>(&mut self, now: Instant, state: TransportState) -> Option<InternalOutput> {
self.state = state;
match &self.state {
TransportState::Connecting => None,
TransportState::ConnectError(_) => None,
TransportState::Connecting => {
log::info!("[EndpointInternal] connecting");
None
}
TransportState::ConnectError(err) => {
log::info!("[EndpointInternal] connect error");
Some(InternalOutput::Destroy)
}
TransportState::Connected => {
log::info!("[EndpointInternal] connected");
for i in 0..self.local_tracks_id.len() {
let id = self.local_tracks_id[i];
if let Some(out) = self.local_tracks.get_mut(&id).expect("Should have").on_connected(now) {
Expand All @@ -114,12 +123,25 @@ impl EndpointInternal {
}
}
}
let (room, peer) = self.room.as_ref()?;
self.queue.push_back(InternalOutput::Cluster(ClusterEndpointControl::JoinRoom(room.clone(), peer.clone())));
if let Some((room, peer)) = self.room.as_ref() {
log::info!("[EndpointInternal] join_room({room}, {peer}) after connected");
self.queue.push_back(InternalOutput::Cluster(ClusterEndpointControl::JoinRoom(room.clone(), peer.clone())));
}
self.queue.pop_front()
}
TransportState::Reconnecting => {
log::info!("[EndpointInternal] reconnecting");
None
}
TransportState::Disconnected(err) => {
log::info!("[EndpointInternal] disconnected {:?}", err);
if let Some((room, peer)) = &self.room {
log::info!("[EndpointInternal] leave_room({room}, {peer}) after disconnected");
self.queue.push_back(InternalOutput::Cluster(ClusterEndpointControl::LeaveRoom));
}
self.queue.push_back(InternalOutput::Destroy);
self.queue.pop_front()
}
TransportState::Reconnecting => None,
TransportState::Disconnected(_) => None,
}
}

Expand All @@ -128,7 +150,7 @@ impl EndpointInternal {
self.remote_tracks_id.push(track);
self.remote_tracks.insert(track, EndpointRemoteTrack::default());
}
let out = self.remote_tracks.get_mut(&track)?.on_event(now, event)?;
let out = self.remote_tracks.get_mut(&track)?.on_transport_event(now, event)?;
self.on_remote_track_output(now, track, out)
}

Expand All @@ -137,7 +159,7 @@ impl EndpointInternal {
self.local_tracks_id.push(track);
self.local_tracks.insert(track, EndpointLocalTrack::default());
}
let out = self.local_tracks.get_mut(&track)?.on_event(now, event)?;
let out = self.local_tracks.get_mut(&track)?.on_transport_event(now, event)?;
self.on_local_track_output(now, track, out)
}

Expand All @@ -160,25 +182,25 @@ impl EndpointInternal {
}

fn on_cluster_remote_track<'a>(&mut self, now: Instant, id: RemoteTrackId, event: ClusterRemoteTrackEvent) -> Option<InternalOutput> {
match event {
_ => todo!(),
}
None
}

fn on_cluster_local_track<'a>(&mut self, now: Instant, id: LocalTrackId, event: ClusterLocalTrackEvent) -> Option<InternalOutput> {
match event {
_ => todo!(),
}
None
}
}

/// This block for internal local and remote track
impl EndpointInternal {
fn on_remote_track_output<'a>(&mut self, now: Instant, id: RemoteTrackId, out: remote_track::Output) -> Option<InternalOutput> {
todo!()
match out {
remote_track::Output::Cluster(control) => Some(InternalOutput::Cluster(ClusterEndpointControl::RemoteTrack(id, control))),
}
}

fn on_local_track_output<'a>(&mut self, now: Instant, id: LocalTrackId, out: local_track::Output) -> Option<InternalOutput> {
todo!()
match out {
local_track::Output::Cluster(control) => Some(InternalOutput::Cluster(ClusterEndpointControl::LocalTrack(id, control))),
}
}
}
21 changes: 17 additions & 4 deletions packages/media_core/src/endpoint/internal/local_track.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::time::Instant;

use crate::transport::{LocalTrackEvent, LocalTrackId};
use crate::{
cluster::ClusterLocalTrackControl,
transport::{LocalTrackEvent, LocalTrackId},
};

pub enum Output {}
pub enum Output {
Cluster(ClusterLocalTrackControl),
}

#[derive(Default)]
pub struct EndpointLocalTrack {}
Expand All @@ -11,8 +16,16 @@ impl EndpointLocalTrack {
pub fn on_connected(&mut self, now: Instant) -> Option<Output> {
None
}
pub fn on_event(&mut self, now: Instant, event: LocalTrackEvent) -> Option<Output> {
None
pub fn on_transport_event(&mut self, now: Instant, event: LocalTrackEvent) -> Option<Output> {
log::info!("[EndpointLocalTrack] on event {:?}", event);
match event {
LocalTrackEvent::Started => None,
//TODO maybe switch is RPC type
LocalTrackEvent::Switch(Some((peer, track))) => Some(Output::Cluster(ClusterLocalTrackControl::Subscribe(peer, track))),
LocalTrackEvent::Switch(None) => Some(Output::Cluster(ClusterLocalTrackControl::Unsubscribe)),
LocalTrackEvent::RequestKeyFrame => Some(Output::Cluster(ClusterLocalTrackControl::RequestKeyFrame)),
LocalTrackEvent::Ended => None,
}
}
pub fn pop_output(&mut self) -> Option<Output> {
None
Expand Down
20 changes: 16 additions & 4 deletions packages/media_core/src/endpoint/internal/remote_track.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::time::Instant;

use crate::transport::{RemoteTrackEvent, RemoteTrackId};
use media_server_protocol::endpoint::TrackName;

pub enum Output {}
use crate::{cluster::ClusterRemoteTrackControl, transport::RemoteTrackEvent};

pub enum Output {
Cluster(ClusterRemoteTrackControl),
}

#[derive(Default)]
pub struct EndpointRemoteTrack {}
Expand All @@ -11,9 +15,17 @@ impl EndpointRemoteTrack {
pub fn on_connected(&mut self, now: Instant) -> Option<Output> {
None
}
pub fn on_event(&mut self, now: Instant, event: RemoteTrackEvent) -> Option<Output> {
None

pub fn on_transport_event(&mut self, now: Instant, event: RemoteTrackEvent) -> Option<Output> {
match event {
RemoteTrackEvent::Started { name } => Some(Output::Cluster(ClusterRemoteTrackControl::Started(TrackName(name)))),
RemoteTrackEvent::Paused => None,
RemoteTrackEvent::Resumed => None,
RemoteTrackEvent::Media(_) => None,
RemoteTrackEvent::Ended => Some(Output::Cluster(ClusterRemoteTrackControl::Ended)),
}
}

pub fn pop_output(&mut self) -> Option<Output> {
None
}
Expand Down
Loading

0 comments on commit 41206fe

Please sign in to comment.