diff --git a/packages/media_core/src/cluster.rs b/packages/media_core/src/cluster.rs index 668310b8..d83fd7e0 100644 --- a/packages/media_core/src/cluster.rs +++ b/packages/media_core/src/cluster.rs @@ -15,7 +15,7 @@ use std::{ use atm0s_sdn::features::{FeaturesControl, FeaturesEvent}; use media_server_protocol::{ - endpoint::{PeerId, RoomId, TrackMeta, TrackName}, + endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName}, media::MediaPacket, }; @@ -43,7 +43,7 @@ pub enum ClusterRemoteTrackControl { Ended, } -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum ClusterRemoteTrackEvent { RequestKeyFrame, LimitBitrate { min: u32, max: u32 }, @@ -57,7 +57,7 @@ pub enum ClusterLocalTrackControl { Unsubscribe, } -#[derive(Debug, Clone)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum ClusterLocalTrackEvent { Started, SourceChanged, @@ -65,22 +65,9 @@ pub enum ClusterLocalTrackEvent { Ended, } -#[derive(Debug)] -pub enum ClusterRoomInfoPublishLevel { - Full, - TrackOnly, -} - -#[derive(Debug)] -pub enum ClusterRoomInfoSubscribeLevel { - Full, - TrackOnly, - Manual, -} - #[derive(Debug)] pub enum ClusterEndpointControl { - Join(PeerId, ClusterRoomInfoPublishLevel, ClusterRoomInfoSubscribeLevel), + Join(PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe), Leave, SubscribePeer(PeerId), UnsubscribePeer(PeerId), @@ -88,8 +75,10 @@ pub enum ClusterEndpointControl { LocalTrack(LocalTrackId, ClusterLocalTrackControl), } -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum ClusterEndpointEvent { + PeerJoined(PeerId, PeerMeta), + PeerLeaved(PeerId), TrackStarted(PeerId, TrackName, TrackMeta), TrackStopped(PeerId, TrackName), RemoteTrack(RemoteTrackId, ClusterRemoteTrackEvent), diff --git a/packages/media_core/src/cluster/room.rs b/packages/media_core/src/cluster/room.rs index 4f639266..52679ead 100644 --- a/packages/media_core/src/cluster/room.rs +++ b/packages/media_core/src/cluster/room.rs @@ -140,8 +140,8 @@ impl ClusterRoom { fn on_endpoint_control(&mut self, now: Instant, owner: Owner, control: ClusterEndpointControl) -> Option> { match control { - ClusterEndpointControl::Join(peer, publish, subscribe) => { - let out = self.metadata.on_join(owner, peer, publish, subscribe)?; + ClusterEndpointControl::Join(peer, meta, publish, subscribe) => { + let out = self.metadata.on_join(owner, peer, meta, publish, subscribe)?; Some(self.process_meta_output(out)) } ClusterEndpointControl::Leave => { @@ -230,7 +230,7 @@ impl ClusterRoom { } } -pub fn track_key>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T { +pub fn gen_channel_id>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T { let mut h = std::hash::DefaultHasher::new(); room.as_ref().hash(&mut h); peer.as_ref().hash(&mut h); diff --git a/packages/media_core/src/cluster/room/channel_pub.rs b/packages/media_core/src/cluster/room/channel_pub.rs index 948449f6..622aae45 100644 --- a/packages/media_core/src/cluster/room/channel_pub.rs +++ b/packages/media_core/src/cluster/room/channel_pub.rs @@ -46,16 +46,13 @@ impl RoomChannelPublisher { let (owner, track_id) = self.tracks_source.get(&channel)?; match fb_kind { FeedbackKind::Bitrate => todo!(), - FeedbackKind::KeyFrameRequest => Some(Output::Endpoint( - vec![owner.clone()], - ClusterEndpointEvent::RemoteTrack(*track_id, ClusterRemoteTrackEvent::RequestKeyFrame), - )), + FeedbackKind::KeyFrameRequest => Some(Output::Endpoint(vec![*owner], ClusterEndpointEvent::RemoteTrack(*track_id, ClusterRemoteTrackEvent::RequestKeyFrame))), } } pub fn on_track_publish(&mut self, owner: Owner, track: RemoteTrackId, peer: PeerId, name: TrackName) -> Option> { log::info!("[ClusterRoom {}] peer ({peer} started track {name})", self.room); - let channel_id = super::track_key(self.room, &peer, &name); + let channel_id = super::gen_channel_id(self.room, &peer, &name); self.tracks.insert((owner, track), (peer.clone(), name.clone(), channel_id)); self.tracks_source.insert(channel_id, (owner, track)); diff --git a/packages/media_core/src/cluster/room/channel_sub.rs b/packages/media_core/src/cluster/room/channel_sub.rs index 7aafbb15..6716a172 100644 --- a/packages/media_core/src/cluster/room/channel_sub.rs +++ b/packages/media_core/src/cluster/room/channel_sub.rs @@ -49,7 +49,7 @@ impl RoomChannelSubscribe { log::info!("[ClusterRoom {}] cluster: channel {channel} source changed => fire event to {:?}", self.room, subscribers); for (owner, track) in subscribers { self.queue - .push_back(Output::Endpoint(vec![owner.clone()], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::SourceChanged))) + .push_back(Output::Endpoint(vec![*owner], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::SourceChanged))) } self.queue.pop_front() } @@ -59,16 +59,14 @@ impl RoomChannelSubscribe { let subscribers = self.subscribers.get(&channel)?; log::trace!("[ClusterRoom {}] on channel media payload {} seq {} to {} subscribers", self.room, pkt.pt, pkt.seq, subscribers.len()); for (owner, track) in subscribers { - self.queue.push_back(Output::Endpoint( - vec![owner.clone()], - ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::Media(pkt.clone())), - )) + self.queue + .push_back(Output::Endpoint(vec![*owner], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::Media(pkt.clone())))) } self.queue.pop_front() } pub fn on_track_subscribe(&mut self, owner: Owner, track: LocalTrackId, target_peer: PeerId, target_track: TrackName) -> Option> { - let channel_id: ChannelId = super::track_key(self.room, &target_peer, &target_track); + let channel_id: ChannelId = super::gen_channel_id(self.room, &target_peer, &target_track); log::info!( "[ClusterRoom {}] owner {:?} track {track} subscribe peer {target_peer} track {target_track}), channel: {channel_id}", self.room, diff --git a/packages/media_core/src/cluster/room/metadata.rs b/packages/media_core/src/cluster/room/metadata.rs index 17030fe1..74a0dcdc 100644 --- a/packages/media_core/src/cluster/room/metadata.rs +++ b/packages/media_core/src/cluster/room/metadata.rs @@ -8,23 +8,29 @@ //! use std::{ - collections::{HashMap, VecDeque}, + collections::VecDeque, fmt::Debug, - hash::Hash, + hash::{DefaultHasher, Hash, Hasher}, time::Instant, }; use atm0s_sdn::features::dht_kv::{self, Key, Map, MapControl, MapEvent}; -use media_server_protocol::{ - endpoint::{PeerId, TrackMeta, TrackName}, - media::TrackInfo, -}; +use media_server_protocol::endpoint::{PeerId, PeerInfo, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, TrackInfo, TrackMeta, TrackName}; +use smallmap::{Map as SmallMap, Set as SmallSet}; use crate::{ - cluster::{ClusterEndpointEvent, ClusterRoomHash, ClusterRoomInfoPublishLevel, ClusterRoomInfoSubscribeLevel}, + cluster::{ClusterEndpointEvent, ClusterRoomHash}, transport::RemoteTrackId, }; +struct PeerContainer { + peer: PeerId, + publish: RoomInfoPublish, + sub_peers: SmallSet, + pub_tracks: SmallMap, +} + +#[derive(Debug, PartialEq, Eq)] pub enum Output { Kv(dht_kv::Control), Endpoint(Vec, ClusterEndpointEvent), @@ -32,10 +38,15 @@ pub enum Output { pub struct RoomMetadata { room: ClusterRoomHash, - room_map: Map, - peers: HashMap, - local_tracks: HashMap<(Owner, RemoteTrackId), (PeerId, TrackName, Key)>, - remote_tracks: HashMap, + peers_map: Map, + tracks_map: Map, + peers: SmallMap, + peers_map_subscribers: SmallSet, + tracks_map_subscribers: SmallSet, + //This is for storing list of owners subscribe manual a target track + peers_tracks_subs: SmallMap>, + cluster_peers: SmallMap, + cluster_tracks: SmallMap, queue: VecDeque>, } @@ -43,76 +54,233 @@ impl RoomMetadata { pub fn new(room: ClusterRoomHash) -> Self { Self { room, - room_map: room.0.into(), - peers: HashMap::new(), - local_tracks: HashMap::new(), - remote_tracks: HashMap::new(), + peers_map: Self::peers_map(room), + tracks_map: Self::tracks_map(room), + peers: SmallMap::new(), + peers_map_subscribers: SmallMap::new(), + tracks_map_subscribers: SmallMap::new(), + peers_tracks_subs: SmallMap::new(), + cluster_peers: SmallMap::new(), + cluster_tracks: SmallMap::new(), queue: VecDeque::new(), } } + fn peer_map(room: ClusterRoomHash, peer: &PeerId) -> Map { + let mut h = DefaultHasher::new(); + room.as_ref().hash(&mut h); + peer.as_ref().hash(&mut h); + h.finish().into() + } + + fn peers_map(room: ClusterRoomHash) -> Map { + room.0.into() + } + + fn peers_key(peer: &PeerId) -> Key { + let mut h = DefaultHasher::new(); + peer.as_ref().hash(&mut h); + h.finish().into() + } + + fn tracks_map(room: ClusterRoomHash) -> Map { + (room.0 + 1).into() + } + + fn tracks_key(peer: &PeerId, track: &TrackName) -> Key { + let mut h = DefaultHasher::new(); + peer.as_ref().hash(&mut h); + track.as_ref().hash(&mut h); + h.finish().into() + } + pub fn get_peer_from_owner(&self, owner: Owner) -> Option { - self.peers.get(&owner).cloned() + Some(self.peers.get(&owner)?.peer.clone()) } - pub fn on_join(&mut self, owner: Owner, peer: PeerId, publish: ClusterRoomInfoPublishLevel, subscribe: ClusterRoomInfoSubscribeLevel) -> Option> { + /// We put peer to list and register owner to peers and tracks list subscriber based on level + pub fn on_join(&mut self, owner: Owner, peer: PeerId, meta: PeerMeta, publish: RoomInfoPublish, subscribe: RoomInfoSubscribe) -> Option> { log::info!("[ClusterRoom {}] join peer ({peer})", self.room); - self.peers.insert(owner.clone(), peer); - if self.peers.len() == 1 { - log::info!("[ClusterRoom {}] first peer join => subscribe room map", self.room); - Some(Output::Kv(dht_kv::Control::MapCmd(self.room_map, MapControl::Sub))) - } else { - log::info!("[ClusterRoom {}] next peer join => restore {} remote tracks", self.room, self.remote_tracks.len()); - for (_track_key, (peer, name, meta)) in &self.remote_tracks { + // First let insert to peers cache for reuse when we need information of owner + self.peers.insert( + owner, + PeerContainer { + peer: peer.clone(), + publish: publish.clone(), + sub_peers: SmallSet::new(), + pub_tracks: SmallMap::new(), + }, + ); + let peer_key = Self::peers_key(&peer); + + // Let Set to peers_map if need need publisj.peer + if publish.peer { + self.queue + .push_back(Output::Kv(dht_kv::Control::MapCmd(self.peers_map, MapControl::Set(peer_key, PeerInfo { peer, meta }.serialize())))) + } + // Let Sub to peers_map if need need subscribe.peers + if subscribe.peers { + self.peers_map_subscribers.insert(owner, ()); + log::info!("[ClusterRoom {}] next peer sub peers => restore {} remote peers", self.room, self.cluster_peers.len()); + + // Restore already added peers + for (_track_key, info) in self.cluster_peers.iter() { + //TODO avoiding duplicate same peer self.queue - .push_back(Output::Endpoint(vec![owner.clone()], ClusterEndpointEvent::TrackStarted(peer.clone(), name.clone(), meta.clone()))); + .push_back(Output::Endpoint(vec![owner], ClusterEndpointEvent::PeerJoined(info.peer.clone(), info.meta.clone()))); } - self.queue.pop_front() + // If this is first peer which subscribed to peers_map, the should send Sub + if self.peers_map_subscribers.len() == 1 { + log::info!("[ClusterRoom {}] first peer sub peers map => subscribe", self.room); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.peers_map, MapControl::Sub))); + } } + // Let Sub to tracks_map if need need subscribe.tracks + if subscribe.tracks { + self.tracks_map_subscribers.insert(owner, ()); + log::info!("[ClusterRoom {}] next peer sub tracks => restore {} remote tracks", self.room, self.cluster_tracks.len()); + + // Restore already added tracks + for (_track_key, info) in self.cluster_tracks.iter() { + //TODO avoiding duplicate same peer + self.queue.push_back(Output::Endpoint( + vec![owner], + ClusterEndpointEvent::TrackStarted(info.peer.clone(), info.track.clone(), info.meta.clone()), + )); + } + + // If this is first peer which subscribed to tracks_map, the should send Sub + if self.tracks_map_subscribers.len() == 1 { + log::info!("[ClusterRoom {}] first peer sub tracks map => subscribe", self.room); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Sub))); + } + }; + self.queue.pop_front() } pub fn on_leave(&mut self, owner: Owner) -> Option> { let peer = self.peers.remove(&owner).expect("Should have owner"); - log::info!("[ClusterRoom {}] leave peer ({peer})", self.room); - if self.peers.is_empty() { - log::info!("[ClusterRoom {}] last peer leave => unsubscribe room map", self.room); - Some(Output::Kv(dht_kv::Control::MapCmd(self.room_map, MapControl::Unsub))) - } else { - None + log::info!("[ClusterRoom {}] leave peer {}", self.room, peer.peer); + let peer_key = Self::peers_key(&peer.peer); + // If remain remote tracks, must to delete from list. + if peer.publish.peer { + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.peers_map, MapControl::Del(peer_key)))) } + + // If remain remote tracks, must to delete from list. + let peer_map = Self::peer_map(self.room, &peer.peer); + for (_, track) in peer.pub_tracks.into_iter() { + let track_key = Self::tracks_key(&peer.peer, &track); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Del(track_key)))); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(peer_map, MapControl::Del(track_key)))); + } + + if self.peers_map_subscribers.remove(&owner).is_some() { + if self.peers_map_subscribers.is_empty() { + log::info!("[ClusterRoom {}] last peer unsub peers map => unsubscribe", self.room); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.peers_map, MapControl::Unsub))); + } + } + + if self.tracks_map_subscribers.remove(&owner).is_some() { + if self.tracks_map_subscribers.is_empty() { + log::info!("[ClusterRoom {}] last peer unsub tracks map => unsubscribe", self.room); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Unsub))); + } + } + + // check if this peer manual subscribe to some private peer map => need send Unsub + for (target, _) in peer.sub_peers.into_iter() { + let target_peer_map = Self::peer_map(self.room, &target); + let subs = self.peers_tracks_subs.get_mut(&target_peer_map).expect("Should have private peer_map"); + subs.remove(&owner); + if subs.is_empty() { + self.peers_tracks_subs.remove(&target_peer_map); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(target_peer_map, MapControl::Unsub))); + } + } + + self.queue.pop_front() } pub fn on_subscribe_peer(&mut self, owner: Owner, target: PeerId) -> Option> { - todo!() + let peer = self.peers.get_mut(&owner).expect("Should have peer"); + let target_peer_map = Self::peer_map(self.room, &target); + let subs = self.peers_tracks_subs.entry(target_peer_map).or_default(); + let need_sub = subs.is_empty(); + subs.insert(owner, ()); + peer.sub_peers.insert(target, ()); + + if need_sub { + Some(Output::Kv(dht_kv::Control::MapCmd(target_peer_map, MapControl::Sub))) + } else { + None + } } pub fn on_unsubscribe_peer(&mut self, owner: Owner, target: PeerId) -> Option> { - todo!() + let peer = self.peers.get_mut(&owner).expect("Should have peer"); + let target_peer_map = Self::peer_map(self.room, &target); + let subs = self.peers_tracks_subs.entry(target_peer_map).or_default(); + subs.remove(&owner); + peer.sub_peers.remove(&target); + if subs.is_empty() { + self.peers_tracks_subs.remove(&target_peer_map); + Some(Output::Kv(dht_kv::Control::MapCmd(target_peer_map, MapControl::Unsub))) + } else { + None + } } pub fn on_track_publish(&mut self, owner: Owner, track_id: RemoteTrackId, track: TrackName, meta: TrackMeta) -> Option> { - let peer = self.peers.get(&owner)?; - let info = TrackInfo { - peer: peer.clone(), - track: track.clone(), - meta, - }; - let map_key = super::track_key(self.room, &peer, &track); - self.local_tracks.insert((owner, track_id), (peer.clone(), track.clone(), map_key)); - Some(Output::Kv(dht_kv::Control::MapCmd((*self.room.as_ref()).into(), MapControl::Set(map_key, info.serialize())))) + let peer = self.peers.get_mut(&owner)?; + if peer.publish.tracks { + let info = TrackInfo { + peer: peer.peer.clone(), + track: track.clone(), + meta, + }; + let track_key = Self::tracks_key(&peer.peer, &track); + peer.pub_tracks.insert(track_id, track); + + let peer_map = Self::peer_map(self.room, &peer.peer); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(peer_map, MapControl::Set(track_key, info.serialize())))); + + Some(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Set(track_key, info.serialize())))) + } else { + None + } } pub fn on_track_unpublish(&mut self, owner: Owner, track_id: RemoteTrackId) -> Option> { - let (peer, track, map_key) = self.local_tracks.remove(&(owner, track_id))?; - Some(Output::Kv(dht_kv::Control::MapCmd((*self.room.as_ref()).into(), MapControl::Del(map_key)))) + let peer = self.peers.get_mut(&owner)?; + let track = peer.pub_tracks.remove(&track_id)?; + let track_key = Self::tracks_key(&peer.peer, &track); + + let peer_map = Self::peer_map(self.room, &peer.peer); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(peer_map, MapControl::Del(track_key)))); + + Some(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Del(track_key)))) } pub fn on_kv_event(&mut self, map: Map, event: MapEvent) -> Option> { - if self.room_map == map { + if self.peers_map == map { + match event { + dht_kv::MapEvent::OnSet(peer_key, _source, data) => self.on_peers_kv_event(peer_key, Some(data)), + dht_kv::MapEvent::OnDel(peer_key, _source) => self.on_peers_kv_event(peer_key, None), + dht_kv::MapEvent::OnRelaySelected(_) => None, + } + } else if self.tracks_map == map { match event { - dht_kv::MapEvent::OnSet(track_key, _source, data) => self.on_room_kv_event(track_key, Some(data)), - dht_kv::MapEvent::OnDel(track_key, _source) => self.on_room_kv_event(track_key, None), + dht_kv::MapEvent::OnSet(track_key, _source, data) => self.on_tracks_kv_event(track_key, Some(data)), + dht_kv::MapEvent::OnDel(track_key, _source) => self.on_tracks_kv_event(track_key, None), + dht_kv::MapEvent::OnRelaySelected(_) => None, + } + } else if self.peers_tracks_subs.contains_key(&map) { + match event { + dht_kv::MapEvent::OnSet(track_key, _source, data) => self.on_peers_tracks_kv_event(map, track_key, Some(data)), + dht_kv::MapEvent::OnDel(track_key, _source) => self.on_peers_tracks_kv_event(map, track_key, None), dht_kv::MapEvent::OnRelaySelected(_) => None, } } else { @@ -120,43 +288,548 @@ impl RoomMetadata { } } - pub fn pop_output(&mut self, now: Instant) -> Option> { + pub fn pop_output(&mut self, _now: Instant) -> Option> { self.queue.pop_front() } - fn on_room_kv_event(&mut self, track: dht_kv::Key, data: Option>) -> Option> { + fn on_peers_kv_event(&mut self, peer_key: dht_kv::Key, data: Option>) -> Option> { + let info = if let Some(data) = data { + Some(PeerInfo::deserialize(&data)?) + } else { + None + }; + + let subscribers = self.peers_map_subscribers.iter().map(|a| a.0).collect::>(); + if let Some(info) = info { + log::info!("[ClusterRoom {}] cluster: peer {} joined => fire event to {:?}", self.room, info.peer, subscribers); + self.cluster_peers.insert(peer_key, info.clone()); + if subscribers.is_empty() { + None + } else { + Some(Output::Endpoint(subscribers, ClusterEndpointEvent::PeerJoined(info.peer, info.meta))) + } + } else { + let info = self.cluster_peers.remove(&peer_key)?; + log::info!("[ClusterRoom {}] cluster: peer ({}) leaved => fire event to {:?}", self.room, info.peer, subscribers); + if subscribers.is_empty() { + None + } else { + Some(Output::Endpoint(subscribers, ClusterEndpointEvent::PeerLeaved(info.peer))) + } + } + } + + fn on_tracks_kv_event(&mut self, track: dht_kv::Key, data: Option>) -> Option> { + let info = if let Some(data) = data { + Some(TrackInfo::deserialize(&data)?) + } else { + None + }; + + let subscribers = self.tracks_map_subscribers.iter().map(|a| a.0).collect::>(); + if let Some(info) = info { + log::info!( + "[ClusterRoom {}] cluster: peer ({}) started track {}) => fire event to {:?}", + self.room, + info.peer, + info.track, + subscribers + ); + self.cluster_tracks.insert(track, info.clone()); + if subscribers.is_empty() { + None + } else { + Some(Output::Endpoint(subscribers, ClusterEndpointEvent::TrackStarted(info.peer, info.track, info.meta))) + } + } else { + let info = self.cluster_tracks.remove(&track)?; + log::info!( + "[ClusterRoom {}] cluster: peer ({}) stopped track {}) => fire event to {:?}", + self.room, + info.peer, + info.track, + subscribers + ); + if subscribers.is_empty() { + None + } else { + Some(Output::Endpoint(subscribers, ClusterEndpointEvent::TrackStopped(info.peer, info.track))) + } + } + } + + fn on_peers_tracks_kv_event(&mut self, peer_map: Map, track: dht_kv::Key, data: Option>) -> Option> { let info = if let Some(data) = data { Some(TrackInfo::deserialize(&data)?) } else { None }; - let peers = self.peers.keys().cloned().collect::>(); + let subscribers = self.peers_tracks_subs.get(&peer_map)?.iter().map(|a| a.0).collect::>(); if let Some(info) = info { - log::info!("[ClusterRoom {}] cluster: peer ({}) started track {}) => fire event to {:?}", self.room, info.peer, info.track, peers); - self.remote_tracks.insert(track, (info.peer.clone(), info.track.clone(), info.meta.clone())); - Some(Output::Endpoint(peers, ClusterEndpointEvent::TrackStarted(info.peer, info.track, info.meta))) + log::info!( + "[ClusterRoom {}] cluster: peer ({}) started track {}) => fire event to {:?}", + self.room, + info.peer, + info.track, + subscribers + ); + self.cluster_tracks.insert(track, info.clone()); + Some(Output::Endpoint(subscribers, ClusterEndpointEvent::TrackStarted(info.peer, info.track, info.meta))) } else { - let (peer, name, _meta) = self.remote_tracks.remove(&track)?; - log::info!("[ClusterRoom {}] cluster: peer ({}) stopped track {}) => fire event to {:?}", self.room, peer, name, peers); - Some(Output::Endpoint(peers, ClusterEndpointEvent::TrackStopped(peer, name))) + let info = self.cluster_tracks.remove(&track)?; + log::info!( + "[ClusterRoom {}] cluster: peer ({}) stopped track {}) => fire event to {:?}", + self.room, + info.peer, + info.track, + subscribers + ); + Some(Output::Endpoint(subscribers, ClusterEndpointEvent::TrackStopped(info.peer, info.track))) } } } #[cfg(test)] mod tests { - //TODO Test join as full => should subscribe both peers and tracks, fire both peer and track events - //TODO Test leave as full => should unsubscribe both peers and tracks + use std::time::Instant; + + use atm0s_sdn::features::dht_kv::{Control, MapControl, MapEvent}; + use media_server_protocol::endpoint::{PeerId, PeerInfo, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, TrackInfo, TrackName}; + + use crate::{ + cluster::{ClusterEndpointEvent, ClusterRoomHash}, + transport::RemoteTrackId, + }; + + use super::{Output, RoomMetadata}; + + /// Test correct get peer info + #[test] + fn correct_get_peer() { + let room: ClusterRoomHash = 1.into(); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let owner = 1; + room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: false }, + RoomInfoSubscribe { peers: false, tracks: false }, + ); + + assert_eq!(room_meta.get_peer_from_owner(1), Some(peer_id)); + assert_eq!(room_meta.get_peer_from_owner(2), None); + } + + /// Test join as peer only => should subscribe peers, fire only peer + /// After leave should unsubscribe only peers, and del + #[test] + fn join_peer_only() { + let room: ClusterRoomHash = 1.into(); + let peers_map = RoomMetadata::::peers_map(room); + let tracks_map = RoomMetadata::::tracks_map(room); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); + let peer_key = RoomMetadata::::peers_key(&peer_id); + let owner = 1; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: true, tracks: false }, + RoomInfoSubscribe { peers: true, tracks: false }, + ); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(peers_map, MapControl::Set(peer_key, peer_info.serialize()))))); + assert_eq!(room_meta.pop_output(Instant::now()), Some(Output::Kv(Control::MapCmd(peers_map, MapControl::Sub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // should handle incoming event with only peer and reject track + let out = room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer_key, 0, peer_info.serialize())); + assert_eq!(out, Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::PeerJoined(peer_id.clone(), peer_meta.clone())))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + let track_name: TrackName = "audio_main".to_string().into(); + let track_info = TrackInfo::simple_audio(peer_id.clone()); + let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let out = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); + assert_eq!(out, None); + + // should only handle remove peer event, reject track + let out = room_meta.on_kv_event(tracks_map, MapEvent::OnDel(track_key, 0)); + assert_eq!(out, None); + + let out = room_meta.on_kv_event(peers_map, MapEvent::OnDel(peer_key, 0)); + assert_eq!(out, Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::PeerLeaved(peer_id.clone())))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // peer leave should send unsub and del + let out = room_meta.on_leave(owner); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(peers_map, MapControl::Del(peer_key))))); + assert_eq!(room_meta.pop_output(Instant::now()), Some(Output::Kv(Control::MapCmd(peers_map, MapControl::Unsub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + } + + #[test] + fn join_sub_peer_only_should_restore_old_peers() { + let room: ClusterRoomHash = 1.into(); + let peers_map = RoomMetadata::::peers_map(room); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + + let peer2: PeerId = "peer2".to_string().into(); + let peer2_key = RoomMetadata::::peers_key(&peer2); + let peer2_info = PeerInfo::new(peer2, PeerMeta {}); + + let out = room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer2_key, 0, peer2_info.serialize())); + assert_eq!(out, None); + + let owner = 1; + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: false }, + RoomInfoSubscribe { peers: true, tracks: false }, + ); + assert_eq!( + out, + Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::PeerJoined(peer2_info.peer.clone(), peer2_info.meta.clone()))) + ); + assert_eq!(room_meta.pop_output(Instant::now()), Some(Output::Kv(Control::MapCmd(peers_map, MapControl::Sub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + } + //TODO Test join as track only => should subscribe only tracks, fire only track events - //TODO Test leave as track only => should unsubscribe only tracks - //TODO Test join as manual => dont subscribe - //TODO Test leave as manual => don unsubscribe - //TODO Test manual and subscribe peer => should subscribe that peer - //TODO Test manual and unsubscribe peer => should unsubscribe that peer + #[test] + fn join_track_only() { + let room: ClusterRoomHash = 1.into(); + let peers_map = RoomMetadata::::peers_map(room); + let tracks_map = RoomMetadata::::tracks_map(room); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); + let peer_key = RoomMetadata::::peers_key(&peer_id); + let owner = 1; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: true }, + RoomInfoSubscribe { peers: false, tracks: true }, + ); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Sub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // should handle incoming event with only track and reject peer + let out = room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer_key, 0, peer_info.serialize())); + assert_eq!(out, None); + + let track_name: TrackName = "audio_main".to_string().into(); + let track_info = TrackInfo::simple_audio(peer_id.clone()); + let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let out = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); + assert_eq!( + out, + Some(Output::Endpoint( + vec![owner], + ClusterEndpointEvent::TrackStarted(peer_id.clone(), track_name.clone(), track_info.meta.clone()) + )) + ); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // should only handle remove track event, reject peer + let out = room_meta.on_kv_event(tracks_map, MapEvent::OnDel(track_key, 0)); + assert_eq!(out, Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::TrackStopped(peer_id.clone(), track_name.clone())))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + let out = room_meta.on_kv_event(peers_map, MapEvent::OnDel(peer_key, 0)); + assert_eq!(out, None); + + // peer leave should send unsub + let out = room_meta.on_leave(owner); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Unsub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + } + + //join track only should restore old tracks + #[test] + fn join_sub_track_only_should_restore_old_tracks() { + let room: ClusterRoomHash = 1.into(); + let tracks_map = RoomMetadata::::tracks_map(room); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + + let peer2: PeerId = "peer2".to_string().into(); + let track_name: TrackName = "audio_main".to_string().into(); + let track_key = RoomMetadata::::tracks_key(&peer2, &track_name); + let track_info = TrackInfo::simple_audio(peer2); + + let out = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); + assert_eq!(out, None); + + let owner = 1; + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: false }, + RoomInfoSubscribe { peers: false, tracks: true }, + ); + assert_eq!( + out, + Some(Output::Endpoint( + vec![owner], + ClusterEndpointEvent::TrackStarted(track_info.peer.clone(), track_info.track.clone(), track_info.meta.clone()) + )) + ); + assert_eq!(room_meta.pop_output(Instant::now()), Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Sub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + } + + //Test manual no subscribe peer => dont fire any event + #[test] + fn join_manual_no_subscribe_peer() { + let room: ClusterRoomHash = 1.into(); + let peers_map = RoomMetadata::::peers_map(room); + let tracks_map = RoomMetadata::::tracks_map(room); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); + let peer_key = RoomMetadata::::peers_key(&peer_id); + let owner = 1; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: false }, + RoomInfoSubscribe { peers: false, tracks: false }, + ); + assert_eq!(out, None); + + // should handle incoming event with only track and reject peer + let out = room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer_key, 0, peer_info.serialize())); + assert_eq!(out, None); + + let track_name: TrackName = "audio_main".to_string().into(); + let track_info = TrackInfo::simple_audio(peer_id.clone()); + let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let out = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); + assert_eq!(out, None); + + // should only handle remove track event, reject peer + let out = room_meta.on_kv_event(tracks_map, MapEvent::OnDel(track_key, 0)); + assert_eq!(out, None); + + let out = room_meta.on_kv_event(peers_map, MapEvent::OnDel(peer_key, 0)); + assert_eq!(out, None); + + // peer leave should send unsub + let out = room_meta.on_leave(owner); + assert_eq!(out, None); + } + + //TODO Test manual and subscribe peer => should fire event + #[test] + fn join_manual_with_subscribe() { + let room: ClusterRoomHash = 1.into(); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let owner = 1; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: false }, + RoomInfoSubscribe { peers: false, tracks: false }, + ); + assert_eq!(out, None); + + let peer2: PeerId = "peer1".to_string().into(); + let peer2_map = RoomMetadata::::peer_map(room, &peer2); + let out = room_meta.on_subscribe_peer(owner, peer2.clone()); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(peer2_map, MapControl::Sub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // should handle incoming event with only track and reject peer + let track_name: TrackName = "audio_main".to_string().into(); + let track_info = TrackInfo::simple_audio(peer_id.clone()); + let track_key = RoomMetadata::::tracks_key(&peer2, &track_name); + let out = room_meta.on_kv_event(peer2_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); + assert_eq!( + out, + Some(Output::Endpoint( + vec![owner], + ClusterEndpointEvent::TrackStarted(peer2.clone(), track_name.clone(), track_info.meta.clone()) + )) + ); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // should only handle remove track event, reject peer + let out = room_meta.on_kv_event(peer2_map, MapEvent::OnDel(track_key, 0)); + assert_eq!(out, Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::TrackStopped(peer2.clone(), track_name.clone())))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // should send unsub when unsubscribe peer + let out = room_meta.on_unsubscribe_peer(owner, peer2.clone()); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(peer2_map, MapControl::Unsub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // peer leave should not send unsub + let out = room_meta.on_leave(owner); + assert_eq!(out, None); + } + //TODO Test track publish => should set key to both single peer map and tracks map - //TODO Test track unpublish => should del key to both single peer map and tracks map - //TODO Handle kv event => should handle peers map - //TODO Handle kv event => should handle tracks map - //TODO Handle kv event => should handle single peer map + #[test] + fn track_publish_enable() { + let room: ClusterRoomHash = 1.into(); + let tracks_map = RoomMetadata::::tracks_map(room); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + + let owner = 1; + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: true }, + RoomInfoSubscribe { peers: false, tracks: false }, + ); + assert_eq!(out, None); + + let track_id: RemoteTrackId = RemoteTrackId(1); + let track_name: TrackName = "audio_main".to_string().into(); + let track_info = TrackInfo::simple_audio(peer_id.clone()); + let peer_map = RoomMetadata::::peer_map(room, &peer_id); + let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let out = room_meta.on_track_publish(owner, track_id, track_name, track_info.meta.clone()); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Set(track_key, track_info.serialize()))))); + assert_eq!( + room_meta.pop_output(Instant::now()), + Some(Output::Kv(Control::MapCmd(peer_map, MapControl::Set(track_key, track_info.serialize())))) + ); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + //after unpublish should delete all tracks + let out = room_meta.on_track_unpublish(owner, track_id); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Del(track_key))))); + assert_eq!(room_meta.pop_output(Instant::now()), Some(Output::Kv(Control::MapCmd(peer_map, MapControl::Del(track_key))))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + //should not pop anything after leave + let out = room_meta.on_leave(owner); + assert_eq!(out, None); + } + + //TODO Test track publish in disable mode => should not set key to both single peer map and tracks map + #[test] + fn track_publish_disable() { + let room: ClusterRoomHash = 1.into(); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + + let owner = 1; + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: false }, + RoomInfoSubscribe { peers: false, tracks: false }, + ); + assert_eq!(out, None); + + let track_id: RemoteTrackId = RemoteTrackId(1); + let track_name: TrackName = "audio_main".to_string().into(); + let track_info = TrackInfo::simple_audio(peer_id.clone()); + let out = room_meta.on_track_publish(owner, track_id, track_name, track_info.meta.clone()); + assert_eq!(out, None); + + //after unpublish should delete all tracks + let out = room_meta.on_track_unpublish(owner, track_id); + assert_eq!(out, None); + + //should not pop anything after leave + let out = room_meta.on_leave(owner); + assert_eq!(out, None); + } + + /// Test leave room auto del remain remote tracks + #[test] + fn leave_room_auto_del_remote_tracks() { + let room: ClusterRoomHash = 1.into(); + let tracks_map = RoomMetadata::::tracks_map(room); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + + let owner = 1; + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: true }, + RoomInfoSubscribe { peers: false, tracks: false }, + ); + assert_eq!(out, None); + + let track_id: RemoteTrackId = RemoteTrackId(1); + let track_name: TrackName = "audio_main".to_string().into(); + let track_info = TrackInfo::simple_audio(peer_id.clone()); + let peer_map = RoomMetadata::::peer_map(room, &peer_id); + let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let out = room_meta.on_track_publish(owner, track_id, track_name, track_info.meta.clone()); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Set(track_key, track_info.serialize()))))); + assert_eq!( + room_meta.pop_output(Instant::now()), + Some(Output::Kv(Control::MapCmd(peer_map, MapControl::Set(track_key, track_info.serialize())))) + ); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + //after leave should auto delete all tracks + let out = room_meta.on_leave(owner); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Del(track_key))))); + assert_eq!(room_meta.pop_output(Instant::now()), Some(Output::Kv(Control::MapCmd(peer_map, MapControl::Del(track_key))))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + } + + // Leave room auto unsub private peer maps + #[test] + fn leave_room_auto_unsub_private_peer_maps() { + let room: ClusterRoomHash = 1.into(); + let mut room_meta: RoomMetadata = RoomMetadata::::new(room); + let peer_id: PeerId = "peer1".to_string().into(); + let peer_meta = PeerMeta {}; + let owner = 1; + let out = room_meta.on_join( + owner, + peer_id.clone(), + peer_meta.clone(), + RoomInfoPublish { peer: false, tracks: false }, + RoomInfoSubscribe { peers: false, tracks: false }, + ); + assert_eq!(out, None); + + let peer2: PeerId = "peer1".to_string().into(); + let peer2_map = RoomMetadata::::peer_map(room, &peer2); + let out = room_meta.on_subscribe_peer(owner, peer2.clone()); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(peer2_map, MapControl::Sub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + + // peer leave should send unsub of peer2_map + let out = room_meta.on_leave(owner); + assert_eq!(out, Some(Output::Kv(Control::MapCmd(peer2_map, MapControl::Unsub)))); + assert_eq!(room_meta.pop_output(Instant::now()), None); + } } diff --git a/packages/media_core/src/endpoint.rs b/packages/media_core/src/endpoint.rs index f95bbac7..95820b16 100644 --- a/packages/media_core/src/endpoint.rs +++ b/packages/media_core/src/endpoint.rs @@ -3,7 +3,7 @@ use std::{marker::PhantomData, time::Instant}; use media_server_protocol::{ - endpoint::{PeerId, RoomId, TrackMeta, TrackName}, + endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName}, media::MediaPacket, transport::RpcResult, }; @@ -47,8 +47,10 @@ impl From for EndpointReqId { /// This is control APIs, which is used to control server from Endpoint SDK pub enum EndpointReq { - JoinRoom(RoomId, PeerId), + JoinRoom(RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe), LeaveRoom, + SubscribePeer(PeerId), + UnsubscribePeer(PeerId), RemoteTrack(RemoteTrackId, EndpointRemoteTrackReq), LocalTrack(LocalTrackId, EndpointLocalTrackReq), } @@ -57,6 +59,8 @@ pub enum EndpointReq { pub enum EndpointRes { JoinRoom(RpcResult<()>), LeaveRoom(RpcResult<()>), + SubscribePeer(RpcResult<()>), + UnsubscribePeer(RpcResult<()>), RemoteTrack(RemoteTrackId, EndpointRemoteTrackRes), LocalTrack(LocalTrackId, EndpointLocalTrackRes), } @@ -73,6 +77,8 @@ pub enum EndpointRemoteTrackEvent { } pub enum EndpointEvent { + PeerJoined(PeerId, PeerMeta), + PeerLeaved(PeerId), PeerTrackStarted(PeerId, TrackName, TrackMeta), PeerTrackStopped(PeerId, TrackName), RemoteMediaTrack(RemoteTrackId, EndpointRemoteTrackEvent), diff --git a/packages/media_core/src/endpoint/internal.rs b/packages/media_core/src/endpoint/internal.rs index bdfade8c..f01bda1c 100644 --- a/packages/media_core/src/endpoint/internal.rs +++ b/packages/media_core/src/endpoint/internal.rs @@ -2,12 +2,16 @@ use std::{collections::VecDeque, time::Instant}; -use media_server_protocol::endpoint::{PeerId, RoomId}; +use media_server_protocol::{ + endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe}, + transport::RpcError, +}; use media_server_utils::Small2dMap; use sans_io_runtime::{TaskGroup, TaskSwitcher}; use crate::{ - cluster::{ClusterEndpointControl, ClusterEndpointEvent, ClusterLocalTrackEvent, ClusterRemoteTrackEvent, ClusterRoomHash, ClusterRoomInfoPublishLevel, ClusterRoomInfoSubscribeLevel}, + cluster::{ClusterEndpointControl, ClusterEndpointEvent, ClusterLocalTrackEvent, ClusterRemoteTrackEvent, ClusterRoomHash}, + errors::EndpointErrors, transport::{LocalTrackEvent, LocalTrackId, RemoteTrackEvent, RemoteTrackId, TransportEvent, TransportState, TransportStats}, }; @@ -34,7 +38,7 @@ pub enum InternalOutput { pub struct EndpointInternal { state: TransportState, - wait_join: Option<(RoomId, PeerId)>, + wait_join: Option<(RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe)>, joined: Option<(ClusterRoomHash, RoomId, PeerId)>, local_tracks_id: Small2dMap, remote_tracks_id: Small2dMap, @@ -125,23 +129,39 @@ impl EndpointInternal { pub fn on_transport_rpc<'a>(&mut self, now: Instant, req_id: EndpointReqId, req: EndpointReq) -> Option { match req { - EndpointReq::JoinRoom(room, peer) => { + EndpointReq::JoinRoom(room, peer, meta, publish, subscribe) => { if matches!(self.state, TransportState::Connecting) { log::info!("[EndpointInternal] join_room({room}, {peer}) but in Connecting state => wait"); - self.wait_join = Some((room, peer)); + self.wait_join = Some((room, peer, meta, publish, subscribe)); None } else { - self.join_room(now, room, peer) + self.join_room(now, room, peer, meta, publish, subscribe) } } EndpointReq::LeaveRoom => { - if let Some((room, peer)) = self.wait_join.take() { + if let Some((room, peer, _meta, _publish, _subscribe)) = self.wait_join.take() { log::info!("[EndpointInternal] leave_room({room}, {peer}) but in Connecting state => only clear local"); None } else { self.leave_room(now) } } + EndpointReq::SubscribePeer(peer) => { + if let Some((room, _, _)) = &self.joined { + self.queue.push_back(InternalOutput::Cluster(*room, ClusterEndpointControl::SubscribePeer(peer))); + Some(InternalOutput::RpcRes(req_id, EndpointRes::SubscribePeer(Ok(())))) + } else { + Some(InternalOutput::RpcRes(req_id, EndpointRes::SubscribePeer(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom))))) + } + } + EndpointReq::UnsubscribePeer(peer) => { + if let Some((room, _, _)) = &self.joined { + self.queue.push_back(InternalOutput::Cluster(*room, ClusterEndpointControl::UnsubscribePeer(peer))); + Some(InternalOutput::RpcRes(req_id, EndpointRes::UnsubscribePeer(Ok(())))) + } else { + Some(InternalOutput::RpcRes(req_id, EndpointRes::UnsubscribePeer(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom))))) + } + } EndpointReq::RemoteTrack(track_id, req) => { let index = self.remote_tracks_id.get1(&track_id)?; let out = self.remote_tracks.on_event(now, *index, remote_track::Input::RpcReq(req_id, req))?; @@ -168,9 +188,9 @@ impl EndpointInternal { } TransportState::Connected => { log::info!("[EndpointInternal] connected"); - let (room, peer) = self.wait_join.take()?; + let (room, peer, meta, publish, subscribe) = self.wait_join.take()?; log::info!("[EndpointInternal] join_room({room}, {peer}) after connected"); - self.join_room(now, room, peer) + self.join_room(now, room, peer, meta, publish, subscribe) } TransportState::Reconnecting => { log::info!("[EndpointInternal] reconnecting"); @@ -215,7 +235,7 @@ impl EndpointInternal { None } - fn join_room<'a>(&mut self, now: Instant, room: RoomId, peer: PeerId) -> Option { + fn join_room<'a>(&mut self, now: Instant, room: RoomId, peer: PeerId, meta: PeerMeta, publish: RoomInfoPublish, subscribe: RoomInfoSubscribe) -> Option { let room_hash: ClusterRoomHash = (&room).into(); log::info!("[EndpointInternal] join_room({room}, {peer}), room_hash {room_hash}"); @@ -224,10 +244,8 @@ impl EndpointInternal { } self.joined = Some(((&room).into(), room.clone(), peer.clone())); - self.queue.push_back(InternalOutput::Cluster( - (&room).into(), - ClusterEndpointControl::Join(peer, ClusterRoomInfoPublishLevel::Full, ClusterRoomInfoSubscribeLevel::Full), - )); + self.queue + .push_back(InternalOutput::Cluster((&room).into(), ClusterEndpointControl::Join(peer, meta, publish, subscribe))); for (track_id, index) in self.local_tracks_id.pairs() { if let Some(out) = self.local_tracks.on_event(now, index, local_track::Input::JoinRoom(room_hash)) { @@ -276,6 +294,8 @@ impl EndpointInternal { impl EndpointInternal { pub fn on_cluster_event<'a>(&mut self, now: Instant, event: ClusterEndpointEvent) -> Option { match event { + ClusterEndpointEvent::PeerJoined(peer, meta) => Some(InternalOutput::Event(EndpointEvent::PeerJoined(peer, meta))), + ClusterEndpointEvent::PeerLeaved(peer) => Some(InternalOutput::Event(EndpointEvent::PeerLeaved(peer))), ClusterEndpointEvent::TrackStarted(peer, track, meta) => Some(InternalOutput::Event(EndpointEvent::PeerTrackStarted(peer, track, meta))), ClusterEndpointEvent::TrackStopped(peer, track) => Some(InternalOutput::Event(EndpointEvent::PeerTrackStopped(peer, track))), ClusterEndpointEvent::RemoteTrack(track, event) => self.on_cluster_remote_track(now, track, event), diff --git a/packages/media_core/src/endpoint/internal/local_track.rs b/packages/media_core/src/endpoint/internal/local_track.rs index e040678f..2aa6c259 100644 --- a/packages/media_core/src/endpoint/internal/local_track.rs +++ b/packages/media_core/src/endpoint/internal/local_track.rs @@ -101,7 +101,7 @@ impl EndpointLocalTrack { Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Ok(())))) } else { log::warn!("[EndpointLocalTrack] view but not in room"); - Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::LocalTrackSwitchNotInRoom))))) + Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom))))) } } EndpointLocalTrackReq::Switch(None) => { @@ -112,11 +112,11 @@ impl EndpointLocalTrack { Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Ok(())))) } else { log::warn!("[EndpointLocalTrack] unview but not bind to any source"); - Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::LocalTrackSwitchNotPin))))) + Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::LocalTrackNotPinSource))))) } } else { log::warn!("[EndpointLocalTrack] unview but not in room"); - Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::LocalTrackSwitchNotInRoom))))) + Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom))))) } } } diff --git a/packages/media_core/src/errors.rs b/packages/media_core/src/errors.rs index 39ee65d6..1293f3d1 100644 --- a/packages/media_core/src/errors.rs +++ b/packages/media_core/src/errors.rs @@ -1,8 +1,9 @@ #[derive(Debug, num_enum::TryFromPrimitive, num_enum::IntoPrimitive)] #[repr(u16)] pub enum EndpointErrors { - LocalTrackSwitchNotInRoom = 0x0000, - LocalTrackSwitchNotPin = 0x0001, + EndpointNotInRoom = 0x0001, + LocalTrackNotPinSource = 0x1001, + RemoteTrack_ = 0x2001, } impl ToString for EndpointErrors { diff --git a/packages/protocol/src/endpoint.rs b/packages/protocol/src/endpoint.rs index 9428252f..0bf93bd7 100644 --- a/packages/protocol/src/endpoint.rs +++ b/packages/protocol/src/endpoint.rs @@ -102,21 +102,92 @@ impl ConnLayer for usize { } } +#[derive(Debug, Clone)] +pub struct RoomInfoPublish { + pub peer: bool, + pub tracks: bool, +} + +#[derive(Debug)] +pub struct RoomInfoSubscribe { + pub peers: bool, + pub tracks: bool, +} + #[derive(From, AsRef, Debug, derive_more::Display, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct RoomId(pub String); #[derive(From, AsRef, Debug, derive_more::Display, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct PeerId(pub String); +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PeerMeta {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerInfo { + pub peer: PeerId, + pub meta: PeerMeta, +} + +impl PeerInfo { + pub fn new(peer: PeerId, meta: PeerMeta) -> Self { + Self { peer, meta } + } +} + +impl PeerInfo { + pub fn serialize(&self) -> Vec { + bincode::serialize(self).expect("should ok") + } + + pub fn deserialize(data: &[u8]) -> Option { + bincode::deserialize::(data).ok() + } +} + #[derive(From, AsRef, Debug, derive_more::Display, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TrackName(pub String); -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TrackMeta { pub kind: MediaKind, pub scaling: MediaScaling, } +impl TrackMeta { + pub fn default_audio() -> Self { + Self { + kind: MediaKind::Audio, + scaling: MediaScaling::None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TrackInfo { + pub peer: PeerId, + pub track: TrackName, + pub meta: TrackMeta, +} + +impl TrackInfo { + pub fn simple_audio(peer: PeerId) -> Self { + Self { + peer, + track: "audio_main".to_string().into(), + meta: TrackMeta::default_audio(), + } + } + + pub fn serialize(&self) -> Vec { + bincode::serialize(self).expect("should ok") + } + + pub fn deserialize(data: &[u8]) -> Option { + bincode::deserialize::(data).ok() + } +} + #[cfg(test)] mod test { use std::str::FromStr; diff --git a/packages/protocol/src/media.rs b/packages/protocol/src/media.rs index 14186838..b005d9dd 100644 --- a/packages/protocol/src/media.rs +++ b/packages/protocol/src/media.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::endpoint::{PeerId, TrackMeta, TrackName}; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum MediaKind { Audio, Video, @@ -19,7 +19,7 @@ impl MediaKind { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum MediaScaling { None, Simulcat, @@ -34,7 +34,7 @@ pub enum MediaCodec { Vp9, } -#[derive(Derivative, Clone, Serialize, Deserialize)] +#[derive(Derivative, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derivative(Debug)] pub struct MediaPacket { pub pt: u8, @@ -55,20 +55,3 @@ impl MediaPacket { bincode::deserialize::(data).ok() } } - -#[derive(Debug, Serialize, Deserialize)] -pub struct TrackInfo { - pub peer: PeerId, - pub track: TrackName, - pub meta: TrackMeta, -} - -impl TrackInfo { - pub fn serialize(&self) -> Vec { - bincode::serialize(self).expect("should ok") - } - - pub fn deserialize(data: &[u8]) -> Option { - bincode::deserialize::(data).ok() - } -} diff --git a/packages/transport_webrtc/src/transport/whep.rs b/packages/transport_webrtc/src/transport/whep.rs index 34dca862..572288ff 100644 --- a/packages/transport_webrtc/src/transport/whep.rs +++ b/packages/transport_webrtc/src/transport/whep.rs @@ -7,7 +7,7 @@ use media_server_core::{ endpoint::{EndpointEvent, EndpointLocalTrackEvent, EndpointLocalTrackReq, EndpointReq}, transport::{LocalTrackEvent, LocalTrackId, TransportError, TransportEvent, TransportOutput, TransportState}, }; -use media_server_protocol::endpoint::{PeerId, RoomId, TrackMeta, TrackName}; +use media_server_protocol::endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName}; use str0m::{ media::{Direction, MediaAdded, MediaKind, Mid}, Event as Str0mEvent, IceConnectionState, @@ -101,6 +101,8 @@ impl TransportWebrtcInternal for TransportWebrtcWhep { fn on_endpoint_event<'a>(&mut self, _now: Instant, event: EndpointEvent) -> Option> { match event { + EndpointEvent::PeerJoined(_, _) => None, + EndpointEvent::PeerLeaved(_) => None, EndpointEvent::PeerTrackStarted(peer, track, meta) => { if self.audio_mid.is_none() && meta.kind.is_audio() { log::info!("[TransportWebrtcWhep] waiting local audio track => push Subscribe candidate to waits"); @@ -116,7 +118,14 @@ impl TransportWebrtcInternal for TransportWebrtcWhep { } EndpointEvent::PeerTrackStopped(peer, track) => self.try_unsubscribe(peer, track), EndpointEvent::LocalMediaTrack(_track, event) => match event { - EndpointLocalTrackEvent::Media(pkt) => Some(InternalOutput::Str0mSendMedia(self.video_mid?, pkt)), + EndpointLocalTrackEvent::Media(pkt) => { + let mid = if pkt.pt == 111 { + self.audio_mid + } else { + self.video_mid + }?; + Some(InternalOutput::Str0mSendMedia(mid, pkt)) + } }, EndpointEvent::RemoteMediaTrack(_track, _event) => None, EndpointEvent::GoAway(_seconds, _reason) => None, @@ -134,7 +143,13 @@ impl TransportWebrtcInternal for TransportWebrtcWhep { self.state = State::Connected; self.queue.push_back(InternalOutput::TransportOutput(TransportOutput::RpcReq( 0.into(), - EndpointReq::JoinRoom(self.room.clone(), self.peer.clone()), + EndpointReq::JoinRoom( + self.room.clone(), + self.peer.clone(), + PeerMeta {}, + RoomInfoPublish { peer: false, tracks: false }, + RoomInfoSubscribe { peers: false, tracks: true }, + ), ))); return Some(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Connected)))); } diff --git a/packages/transport_webrtc/src/transport/whip.rs b/packages/transport_webrtc/src/transport/whip.rs index 7086bc37..96064b80 100644 --- a/packages/transport_webrtc/src/transport/whip.rs +++ b/packages/transport_webrtc/src/transport/whip.rs @@ -8,7 +8,7 @@ use media_server_core::{ transport::{RemoteTrackEvent, RemoteTrackId, TransportError, TransportEvent, TransportOutput, TransportState}, }; use media_server_protocol::{ - endpoint::{PeerId, RoomId, TrackMeta}, + endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta}, media::{MediaKind, MediaScaling}, }; use str0m::{ @@ -95,6 +95,8 @@ impl TransportWebrtcInternal for TransportWebrtcWhip { fn on_endpoint_event<'a>(&mut self, _now: Instant, event: EndpointEvent) -> Option> { match event { + EndpointEvent::PeerJoined(_, _) => None, + EndpointEvent::PeerLeaved(_) => None, EndpointEvent::PeerTrackStarted(_, _, _) => None, EndpointEvent::PeerTrackStopped(_, _) => None, EndpointEvent::RemoteMediaTrack(_, event) => match event { @@ -124,7 +126,13 @@ impl TransportWebrtcInternal for TransportWebrtcWhip { log::info!("[TransportWebrtcWhip] connected"); self.queue.push_back(InternalOutput::TransportOutput(TransportOutput::RpcReq( 0.into(), - EndpointReq::JoinRoom(self.room.clone(), self.peer.clone()), + EndpointReq::JoinRoom( + self.room.clone(), + self.peer.clone(), + PeerMeta {}, + RoomInfoPublish { peer: true, tracks: true }, + RoomInfoSubscribe { peers: false, tracks: false }, + ), ))); return Some(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Connected)))); }