Skip to content

Commit

Permalink
feat: add cluster metadata publish and subscribe options: peer and tr…
Browse files Browse the repository at this point in the history
…ack info (#260)

* feat: add cluster metadata publish and subscribe options: peer and track info

* fixed: wrong sending media mid in whep

* test: cluster metadata tests
  • Loading branch information
giangndm authored Apr 23, 2024
1 parent dee58cc commit 9880d54
Show file tree
Hide file tree
Showing 13 changed files with 908 additions and 147 deletions.
25 changes: 7 additions & 18 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{

use atm0s_sdn::features::{FeaturesControl, FeaturesEvent};
use media_server_protocol::{
endpoint::{PeerId, RoomId, TrackMeta, TrackName},
endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName},
media::MediaPacket,
};

Expand Down Expand Up @@ -43,7 +43,7 @@ pub enum ClusterRemoteTrackControl {
Ended,
}

#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClusterRemoteTrackEvent {
RequestKeyFrame,
LimitBitrate { min: u32, max: u32 },
Expand All @@ -57,39 +57,28 @@ pub enum ClusterLocalTrackControl {
Unsubscribe,
}

#[derive(Debug, Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClusterLocalTrackEvent {
Started,
SourceChanged,
Media(MediaPacket),
Ended,
}

#[derive(Debug)]
pub enum ClusterRoomInfoPublishLevel {
Full,
TrackOnly,
}

#[derive(Debug)]
pub enum ClusterRoomInfoSubscribeLevel {
Full,
TrackOnly,
Manual,
}

#[derive(Debug)]
pub enum ClusterEndpointControl {
Join(PeerId, ClusterRoomInfoPublishLevel, ClusterRoomInfoSubscribeLevel),
Join(PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe),
Leave,
SubscribePeer(PeerId),
UnsubscribePeer(PeerId),
RemoteTrack(RemoteTrackId, ClusterRemoteTrackControl),
LocalTrack(LocalTrackId, ClusterLocalTrackControl),
}

#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClusterEndpointEvent {
PeerJoined(PeerId, PeerMeta),
PeerLeaved(PeerId),
TrackStarted(PeerId, TrackName, TrackMeta),
TrackStopped(PeerId, TrackName),
RemoteTrack(RemoteTrackId, ClusterRemoteTrackEvent),
Expand Down
6 changes: 3 additions & 3 deletions packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Owner> {

fn on_endpoint_control(&mut self, now: Instant, owner: Owner, control: ClusterEndpointControl) -> Option<Output<Owner>> {
match control {
ClusterEndpointControl::Join(peer, publish, subscribe) => {
let out = self.metadata.on_join(owner, peer, publish, subscribe)?;
ClusterEndpointControl::Join(peer, meta, publish, subscribe) => {
let out = self.metadata.on_join(owner, peer, meta, publish, subscribe)?;
Some(self.process_meta_output(out))
}
ClusterEndpointControl::Leave => {
Expand Down Expand Up @@ -230,7 +230,7 @@ impl<Owner: Debug + Clone + Copy + Hash + Eq> ClusterRoom<Owner> {
}
}

pub fn track_key<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T {
pub fn gen_channel_id<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T {
let mut h = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut h);
peer.as_ref().hash(&mut h);
Expand Down
7 changes: 2 additions & 5 deletions packages/media_core/src/cluster/room/channel_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,13 @@ impl<Owner: Debug + Hash + Eq + Copy> RoomChannelPublisher<Owner> {
let (owner, track_id) = self.tracks_source.get(&channel)?;
match fb_kind {
FeedbackKind::Bitrate => todo!(),
FeedbackKind::KeyFrameRequest => Some(Output::Endpoint(
vec![owner.clone()],
ClusterEndpointEvent::RemoteTrack(*track_id, ClusterRemoteTrackEvent::RequestKeyFrame),
)),
FeedbackKind::KeyFrameRequest => Some(Output::Endpoint(vec![*owner], ClusterEndpointEvent::RemoteTrack(*track_id, ClusterRemoteTrackEvent::RequestKeyFrame))),
}
}

pub fn on_track_publish(&mut self, owner: Owner, track: RemoteTrackId, peer: PeerId, name: TrackName) -> Option<Output<Owner>> {
log::info!("[ClusterRoom {}] peer ({peer} started track {name})", self.room);
let channel_id = super::track_key(self.room, &peer, &name);
let channel_id = super::gen_channel_id(self.room, &peer, &name);
self.tracks.insert((owner, track), (peer.clone(), name.clone(), channel_id));
self.tracks_source.insert(channel_id, (owner, track));

Expand Down
10 changes: 4 additions & 6 deletions packages/media_core/src/cluster/room/channel_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<Owner: Hash + Eq + Copy + Debug> RoomChannelSubscribe<Owner> {
log::info!("[ClusterRoom {}] cluster: channel {channel} source changed => fire event to {:?}", self.room, subscribers);
for (owner, track) in subscribers {
self.queue
.push_back(Output::Endpoint(vec![owner.clone()], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::SourceChanged)))
.push_back(Output::Endpoint(vec![*owner], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::SourceChanged)))
}
self.queue.pop_front()
}
Expand All @@ -59,16 +59,14 @@ impl<Owner: Hash + Eq + Copy + Debug> RoomChannelSubscribe<Owner> {
let subscribers = self.subscribers.get(&channel)?;
log::trace!("[ClusterRoom {}] on channel media payload {} seq {} to {} subscribers", self.room, pkt.pt, pkt.seq, subscribers.len());
for (owner, track) in subscribers {
self.queue.push_back(Output::Endpoint(
vec![owner.clone()],
ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::Media(pkt.clone())),
))
self.queue
.push_back(Output::Endpoint(vec![*owner], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::Media(pkt.clone()))))
}
self.queue.pop_front()
}

pub fn on_track_subscribe(&mut self, owner: Owner, track: LocalTrackId, target_peer: PeerId, target_track: TrackName) -> Option<Output<Owner>> {
let channel_id: ChannelId = super::track_key(self.room, &target_peer, &target_track);
let channel_id: ChannelId = super::gen_channel_id(self.room, &target_peer, &target_track);
log::info!(
"[ClusterRoom {}] owner {:?} track {track} subscribe peer {target_peer} track {target_track}), channel: {channel_id}",
self.room,
Expand Down
Loading

0 comments on commit 9880d54

Please sign in to comment.