diff --git a/packages/media_core/src/cluster.rs b/packages/media_core/src/cluster.rs index 518dcede..5190ad40 100644 --- a/packages/media_core/src/cluster.rs +++ b/packages/media_core/src/cluster.rs @@ -88,8 +88,8 @@ pub enum ClusterEndpointControl { AudioMixer(ClusterAudioMixerControl), RemoteTrack(RemoteTrackId, ClusterRemoteTrackControl), LocalTrack(LocalTrackId, ClusterLocalTrackControl), - SubscribeChannel(String), - UnsubscribeChannel(String), + SubscribeVirtualChannel(String), + UnsubscribeVirtualChannel(String), PublishChannel(String, PeerId, Vec), } @@ -102,7 +102,7 @@ pub enum ClusterEndpointEvent { AudioMixer(ClusterAudioMixerEvent), RemoteTrack(RemoteTrackId, ClusterRemoteTrackEvent), LocalTrack(LocalTrackId, ClusterLocalTrackEvent), - ChannelMessage(String, PeerId, Vec), + VirtualChannelMessage(String, PeerId, Vec), } pub enum Input { diff --git a/packages/media_core/src/cluster/id_generator.rs b/packages/media_core/src/cluster/id_generator.rs index 22c3d441..49c10f43 100644 --- a/packages/media_core/src/cluster/id_generator.rs +++ b/packages/media_core/src/cluster/id_generator.rs @@ -41,7 +41,7 @@ pub fn gen_track_channel_id>(room: ClusterRoomHash, peer: &PeerId, h.finish().into() } -pub fn gen_datachannel_id>(room: ClusterRoomHash, key: String) -> T { +pub fn gen_data_channel_id>(room: ClusterRoomHash, key: String) -> T { let mut h = std::hash::DefaultHasher::new(); room.as_ref().hash(&mut h); key.hash(&mut h); diff --git a/packages/media_core/src/cluster/room.rs b/packages/media_core/src/cluster/room.rs index f7395759..36ef8123 100644 --- a/packages/media_core/src/cluster/room.rs +++ b/packages/media_core/src/cluster/room.rs @@ -204,12 +204,12 @@ impl ClusterRoom { } ClusterEndpointControl::RemoteTrack(track, control) => self.on_control_remote_track(now, endpoint, track, control), ClusterEndpointControl::LocalTrack(track, control) => self.on_control_local_track(now, endpoint, track, control), - ClusterEndpointControl::SubscribeChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_subscribe(endpoint, &key), + ClusterEndpointControl::SubscribeVirtualChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_subscribe(endpoint, &key), ClusterEndpointControl::PublishChannel(key, peer, message) => { let data_packet = DataChannelPacket { from: peer, data: message }; self.datachannel.input(&mut self.switcher).on_channel_data(&key, data_packet); } - ClusterEndpointControl::UnsubscribeChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_unsubscribe(endpoint, &key), + ClusterEndpointControl::UnsubscribeVirtualChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_unsubscribe(endpoint, &key), } } } diff --git a/packages/media_core/src/cluster/room/datachannel.rs b/packages/media_core/src/cluster/room/datachannel.rs index eec334fd..d0ff231d 100644 --- a/packages/media_core/src/cluster/room/datachannel.rs +++ b/packages/media_core/src/cluster/room/datachannel.rs @@ -1,12 +1,15 @@ -use std::{fmt::Debug, hash::Hash}; - -use atm0s_sdn::features::pubsub::{self}; +use atm0s_sdn::features::pubsub::{self, ChannelId}; use media_server_protocol::datachannel::DataChannelPacket; use publisher::DataChannelPublisher; use sans_io_runtime::{TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + hash::Hash, +}; use subscriber::DataChannelSubscriber; -use crate::cluster::{ClusterEndpointEvent, ClusterRoomHash}; +use crate::cluster::{id_generator, ClusterEndpointEvent, ClusterRoomHash}; mod publisher; mod subscriber; @@ -25,8 +28,14 @@ pub enum Output { OnResourceEmpty, } +struct ChannelContainer { + subscribers: HashSet, + key: String, +} + pub struct RoomChannel { room: ClusterRoomHash, + channels: HashMap>, publisher: TaskSwitcherBranch, Output>, subscriber: TaskSwitcherBranch, Output>, switcher: TaskSwitcher, @@ -34,9 +43,10 @@ pub struct RoomChannel { impl RoomChannel { pub fn new(room: ClusterRoomHash) -> Self { - log::info!("[ClusterRoomMetadata] Create {}", room); + log::info!("[ClusterRoomDataChannel {}] Create virtual datachannel", room); Self { room, + channels: HashMap::new(), publisher: TaskSwitcherBranch::new(DataChannelPublisher::new(room), TaskType::Publisher), subscriber: TaskSwitcherBranch::new(DataChannelSubscriber::new(room), TaskType::Subscriber), switcher: TaskSwitcher::new(2), @@ -44,33 +54,78 @@ impl RoomChannel { } pub fn is_empty(&self) -> bool { - self.publisher.is_empty() && self.subscriber.is_empty() + self.publisher.is_empty() && self.subscriber.is_empty() && self.channels.is_empty() } pub fn on_pubsub_event(&mut self, event: pubsub::Event) { - let channel = event.0; + let channel_id = event.0; match event.1 { pubsub::ChannelEvent::SourceData(_, data) => { - self.subscriber.input(&mut self.switcher).on_channel_data(channel, data); + if let Some(channel) = self.channels.get(&channel_id) { + log::info!("[ClusterRoomDataChannel {}] Got message from channel {channel_id}, try publish to subscribers", self.room); + let endpoints: Vec = channel.subscribers.iter().copied().collect(); + self.subscriber.input(&mut self.switcher).on_channel_data(channel.key.clone(), endpoints, data); + } else { + log::warn!("[ClusterRoomDataChannel {}] Unexpected Channel {}", self.room, channel_id); + } } _ => {} } } pub fn on_channel_data(&mut self, key: &str, data: DataChannelPacket) { - self.publisher.input(&mut self.switcher).on_channel_data(key, data); + let channel_id: ChannelId = id_generator::gen_data_channel_id(self.room, key.to_string()); + self.publisher.input(&mut self.switcher).on_channel_data(channel_id, data); } pub fn on_channel_subscribe(&mut self, endpoint: Endpoint, key: &str) { - self.subscriber.input(&mut self.switcher).on_channel_subscribe(endpoint, key); + log::info!("[ClusterRoomDataChannel {}] Endpoint {:?} Subscribe Channel {key}", self.room, endpoint); + let channel_id: ChannelId = id_generator::gen_data_channel_id(self.room, key.to_string()); + if let Some(channel) = self.channels.get_mut(&channel_id) { + if !channel.subscribers.insert(endpoint) { + log::warn!("[ClusterRoomDataChannel {}] Endpoint {:?} already subscribed to Channel {}", self.room, endpoint, channel_id); + return; + } + self.subscriber.input(&mut self.switcher).on_channel_subscribe(endpoint, channel_id); + } else { + log::info!("[ClusterRoomDataChannel {}] Create new Channel {}", self.room, channel_id); + let mut channel = ChannelContainer { + subscribers: HashSet::new(), + key: key.to_string(), + }; + channel.subscribers.insert(endpoint); + self.channels.insert(channel_id, channel); + self.publisher.input(&mut self.switcher).on_channel_create(channel_id); + self.subscriber.input(&mut self.switcher).on_channel_create(endpoint, channel_id); + } } pub fn on_channel_unsubscribe(&mut self, endpoint: Endpoint, key: &str) { - self.subscriber.input(&mut self.switcher).on_channel_unsubscribe(endpoint, key); + log::info!("[ClusterRoomDataChannel {}] Endpoint {:?} Unsubscribe Channel {key}", self.room, endpoint); + let channel_id: ChannelId = id_generator::gen_data_channel_id(self.room, key.to_string()); + self.unsub_channel(endpoint, channel_id); + } + + fn unsub_channel(&mut self, endpoint: Endpoint, channel_id: ChannelId) { + if let Some(channel) = self.channels.get_mut(&channel_id) { + if !channel.subscribers.remove(&endpoint) { + log::warn!("[ClusterRoomDataChannel {}] Endpoint {:?} not subscribed to Channel {}", self.room, endpoint, channel_id); + return; + } + self.subscriber.input(&mut self.switcher).on_channel_unsubscribe(endpoint, channel_id); + if channel.subscribers.is_empty() { + log::info!("[ClusterRoomDataChannel {}] Channel have no subscribers, remove Channel {}", self.room, channel_id); + self.publisher.input(&mut self.switcher).on_channel_close(channel_id); + self.subscriber.input(&mut self.switcher).on_channel_close(channel_id); + self.channels.remove(&channel_id); + } + } } pub fn on_leave(&mut self, endpoint: Endpoint) { - self.subscriber.input(&mut self.switcher).on_leave(endpoint); + for channel_id in self.subscriber.get_subscriptions(endpoint) { + self.unsub_channel(endpoint, channel_id); + } } } @@ -82,7 +137,6 @@ impl TaskSwitcherChild> for match self.switcher.current()?.try_into().ok()? { TaskType::Publisher => { if let Some(out) = self.publisher.pop_output((), &mut self.switcher) { - log::info!("[ClusterRoomDataChannel] poped Output publisher {:?}", out); if let Output::OnResourceEmpty = out { if self.is_empty() { return Some(Output::OnResourceEmpty); @@ -94,7 +148,6 @@ impl TaskSwitcherChild> for } TaskType::Subscriber => { if let Some(out) = self.subscriber.pop_output((), &mut self.switcher) { - log::info!("[ClusterRoomDataChannel] poped Output Subscriber {:?}", out); if let Output::OnResourceEmpty = out { if self.is_empty() { return Some(Output::OnResourceEmpty); @@ -112,5 +165,167 @@ impl TaskSwitcherChild> for impl Drop for RoomChannel { fn drop(&mut self) { log::info!("[ClusterRoomDataChannel] Drop {}", self.room); + assert!(self.channels.is_empty(), "channels should be empty on drop"); + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use atm0s_sdn::features::pubsub::{self, ChannelControl}; + use media_server_protocol::{datachannel::DataChannelPacket, endpoint::PeerId}; + use sans_io_runtime::TaskSwitcherChild; + + use crate::cluster::{ + id_generator, + room::datachannel::{Output, RoomChannel}, + ClusterEndpointEvent, + }; + + #[test] + fn sub_unsub() { + let now = (); + let room_id = 1.into(); + let mut room = RoomChannel::new(room_id); + let endpoint1 = 1; + let endpoint2 = 2; + let endpoint3 = 3; + let key = "test"; + let key2 = "test2"; + + // 1 -> test + // 2 -> test + // 3 -> test2 + let channel_id = id_generator::gen_data_channel_id(room_id, key.to_string()); + let channel_id2 = id_generator::gen_data_channel_id(room_id, key2.to_string()); + + // First subscriber will start publish and subscribe on pubsub channel + room.on_channel_subscribe(endpoint1, key); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(room.pop_output(now), None); + + // Second subscriber will do nothing but register in the subscriber list + room.on_channel_subscribe(endpoint2, key); + assert_eq!(room.pop_output(now), None); + assert_eq!(room.subscriber.get_subscriptions(endpoint1), vec![channel_id]); + assert_eq!(room.subscriber.get_subscriptions(endpoint2), vec![channel_id]); + + // First subscriber of a new channel should start publish and subscribe too + room.on_channel_subscribe(endpoint3, key2); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::PubStart)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::SubAuto)))); + assert_eq!(room.subscriber.get_subscriptions(endpoint3), vec![channel_id2]); + + // Last subscriber that unsubscribes will stop the channel + room.on_channel_unsubscribe(endpoint1, key); + assert_eq!(room.subscriber.get_subscriptions(endpoint1), vec![]); + room.on_channel_unsubscribe(endpoint2, key); + assert_eq!(room.subscriber.get_subscriptions(endpoint2), vec![]); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); + + room.on_channel_unsubscribe(endpoint3, key2); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::PubStop)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::UnsubAuto)))); + assert_eq!(room.pop_output(now), None); + + assert!(room.subscriber.is_empty()); + } + + #[test] + fn receive_data() { + let now = (); + let room_id = 1.into(); + let mut room = RoomChannel::new(room_id); + let endpoint1 = 1; + let endpoint2 = 2; + let endpoint3 = 3; + let key = "test"; + let key_fake = "different_channel"; + + let channel_id = id_generator::gen_data_channel_id(room_id, key.to_string()); + let channel_id_fake = id_generator::gen_data_channel_id(room_id, key_fake.to_string()); + + room.on_channel_subscribe(endpoint1, key); + room.on_channel_subscribe(endpoint2, key); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(room.pop_output(now), None); + + room.on_channel_subscribe(endpoint3, key_fake); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::PubStart)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::SubAuto)))); + assert_eq!(room.pop_output(now), None); + + let peer_id = PeerId::from("testid"); + let pkt = DataChannelPacket { + from: peer_id.clone(), + data: vec![1, 2, 3], + }; + room.on_pubsub_event(pubsub::Event(channel_id, pubsub::ChannelEvent::SourceData(1, pkt.serialize()))); + let mut receivers = HashMap::new(); + receivers.insert(endpoint1, false); + receivers.insert(endpoint2, false); + receivers.insert(endpoint3, false); + + if let Some(out) = room.pop_output(now) { + match out { + Output::Endpoint(endpoints, ClusterEndpointEvent::VirtualChannelMessage(key, peer, data)) => { + assert_eq!(key, key); + assert_eq!(peer, peer_id); + assert_eq!(data, pkt.data); + for endpoint in endpoints { + *receivers.get_mut(&endpoint).unwrap() = true; + } + } + _ => panic!("Unexpected output: {:?}", out), + } + } + + // Every endpoint 1 and 2 should received the message + // Endpoint 3 should not receive anything + assert!(receivers[&endpoint1]); + assert!(receivers[&endpoint2]); + assert!(!receivers[&endpoint3]); + assert_eq!(room.pop_output(now), None); + + room.on_channel_unsubscribe(endpoint1, key); + room.on_channel_unsubscribe(endpoint2, key); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); + + room.on_channel_unsubscribe(endpoint3, key_fake); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::PubStop)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::UnsubAuto)))); + + assert_eq!(room.pop_output(now), None); + } + + #[test] + fn leave_room() { + let now = (); + let room_id = 1.into(); + let mut room = RoomChannel::new(room_id); + let endpoint1 = 1; + let endpoint2 = 2; + let key = "test"; + + let channel_id = id_generator::gen_data_channel_id(room_id, key.to_string()); + + room.on_channel_subscribe(endpoint1, key); + room.on_channel_subscribe(endpoint2, key); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(room.pop_output(now), None); + + room.on_leave(endpoint1); + assert_eq!(room.pop_output(now), None); + room.on_leave(endpoint2); + + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); + assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); + assert_eq!(room.pop_output(now), None); } } diff --git a/packages/media_core/src/cluster/room/datachannel/publisher.rs b/packages/media_core/src/cluster/room/datachannel/publisher.rs index e77e3be7..31008ee0 100644 --- a/packages/media_core/src/cluster/room/datachannel/publisher.rs +++ b/packages/media_core/src/cluster/room/datachannel/publisher.rs @@ -4,7 +4,7 @@ use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId}; use media_server_protocol::datachannel::DataChannelPacket; use sans_io_runtime::TaskSwitcherChild; -use crate::cluster::{id_generator, ClusterRoomHash}; +use crate::cluster::ClusterRoomHash; use super::Output; @@ -22,12 +22,21 @@ impl DataChannelPublisher { self.queue.is_empty() } - pub fn on_channel_data(&mut self, key: &str, data: DataChannelPacket) { - log::trace!("[ClusterRoomDataChannel {}/Publishers] publish virtual datachannel", self.room); + pub fn on_channel_data(&mut self, channel_id: ChannelId, data: DataChannelPacket) { + log::info!("[ClusterRoomDataChannel {}/Publishers] publish virtual datachannel", self.room); let data = data.serialize(); - let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, key.to_string()); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubData(data)))) } + + pub fn on_channel_create(&mut self, channel_id: ChannelId) { + log::info!("[ClusterRoomDataChannel {}/Publishers] publish start virtual datachannel", self.room); + self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart))) + } + + pub fn on_channel_close(&mut self, channel_id: ChannelId) { + log::info!("[ClusterRoomDataChannel {}/Publishers] publish stop virtual datachannel", self.room); + self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))) + } } impl TaskSwitcherChild> for DataChannelPublisher { diff --git a/packages/media_core/src/cluster/room/datachannel/subscriber.rs b/packages/media_core/src/cluster/room/datachannel/subscriber.rs index 86dcc6ed..331529ac 100644 --- a/packages/media_core/src/cluster/room/datachannel/subscriber.rs +++ b/packages/media_core/src/cluster/room/datachannel/subscriber.rs @@ -1,29 +1,19 @@ use std::{ - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, fmt::Debug, hash::Hash, }; use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId}; -use derivative::Derivative; use media_server_protocol::datachannel::DataChannelPacket; use sans_io_runtime::{return_if_none, TaskSwitcherChild}; -use crate::cluster::{id_generator, ClusterEndpointEvent, ClusterRoomHash}; - use super::Output; - -#[derive(Derivative)] -#[derivative(Default(bound = ""))] -struct ChannelContainer { - subscribers: HashMap, - key: String, -} +use crate::cluster::{ClusterEndpointEvent, ClusterRoomHash}; pub struct DataChannelSubscriber { room: ClusterRoomHash, - channels: HashMap>, - subscribers: HashMap>, + subscriptions: HashMap>, queue: VecDeque>, } @@ -32,105 +22,44 @@ impl DataChannelSubscriber { Self { room, queue: VecDeque::new(), - channels: HashMap::new(), - subscribers: HashMap::new(), + subscriptions: HashMap::new(), } } - pub fn is_empty(&self) -> bool { - self.queue.is_empty() && self.channels.is_empty() + pub fn get_subscriptions(&self, endpoint: Endpoint) -> Vec { + self.subscriptions.get(&endpoint).map_or_else(Vec::new, |s| s.iter().copied().collect()) } - pub fn on_channel_subscribe(&mut self, endpoint: Endpoint, key: &str) { - let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, key.to_string()); - log::info!("[ClusterRoomDataChannel {}/Subscribers] peer {:?} subscribe channel: {channel_id}", self.room, endpoint); - let channel_container = self.channels.entry(channel_id).or_insert_with(|| { - let channel = ChannelContainer { - subscribers: HashMap::new(), - key: key.to_string(), - }; - self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart))); - channel - }); - - if let Some(subscriber) = self.subscribers.get_mut(&endpoint) { - if !channel_container.subscribers.contains_key(&endpoint) { - subscriber.push(channel_id); - } - } else { - self.subscribers.insert(endpoint, vec![channel_id]); - } - channel_container.subscribers.insert(endpoint, ()); + pub fn is_empty(&self) -> bool { + self.queue.is_empty() && self.subscriptions.is_empty() + } - if channel_container.subscribers.len() == 1 { - log::info!("[ClusterRoomDataChannel {}/Subscribers] first subscriber => Sub channel {channel_id}", self.room); - self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto))); - } + pub fn on_channel_create(&mut self, endpoint: Endpoint, channel_id: ChannelId) { + self.subscriptions.entry(endpoint).or_insert(HashSet::new()).insert(channel_id); + self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto))); } - pub fn on_channel_unsubscribe(&mut self, endpoint: Endpoint, key: &str) { - let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, key.to_string()); - log::info!("[ClusterRoomDataChannel {}/Subscribers] peer {:?} unsubscribe channel: {channel_id}", self.room, endpoint); - let channel_container = return_if_none!(self.channels.get_mut(&channel_id)); - if channel_container.subscribers.contains_key(&endpoint) { - channel_container.subscribers.remove(&endpoint); - if let Some(endpoint_subscriptions) = self.subscribers.get_mut(&endpoint) { - if let Some(index) = endpoint_subscriptions.iter().position(|x| *x == channel_id) { - endpoint_subscriptions.swap_remove(index); - } + pub fn on_channel_close(&mut self, channel_id: ChannelId) { + self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))); + } - if endpoint_subscriptions.is_empty() { - self.subscribers.remove(&endpoint); - } - } - if channel_container.subscribers.is_empty() { - log::info!("[ClusterRoomDataChannel {}/Subscribers] last subscriber => Unsub channel {channel_id}", self.room); - self.channels.remove(&channel_id); - self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))); - self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))); - if self.channels.is_empty() { - log::info!("[ClusterRoomDataChannel {}/Subscribers] last channel => Stop channel {channel_id}", self.room); - self.queue.push_back(Output::OnResourceEmpty); - } - } - } else { - log::warn!("[ClusterRoomDataChannel {}/Subscribers] peer {:?} not subscribe in channel {channel_id}", self.room, endpoint); - } + pub fn on_channel_subscribe(&mut self, endpoint: Endpoint, channel_id: ChannelId) { + self.subscriptions.entry(endpoint).or_insert(HashSet::new()).insert(channel_id); } - pub fn on_leave(&mut self, endpoint: Endpoint) { - log::info!("[ClusterRoomDataChannel {}/Subscribers] peer {:?} leave", self.room, endpoint); - let subscriber = return_if_none!(self.subscribers.remove(&endpoint)); - for channel_id in subscriber { - if let Some(channel_container) = self.channels.get_mut(&channel_id) { - channel_container.subscribers.remove(&endpoint); - if channel_container.subscribers.is_empty() { - log::info!("[ClusterRoomDataChannel {}/Subscribers] last subscriber => Unsub channel {channel_id}", self.room); - self.channels.remove(&channel_id); - self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))); - self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))); - } - if self.channels.is_empty() { - log::info!("[ClusterRoomDataChannel {}/Subscribers] last channel => Stop channel {channel_id}", self.room); - self.queue.push_back(Output::OnResourceEmpty); - } + pub fn on_channel_unsubscribe(&mut self, endpoint: Endpoint, channel_id: ChannelId) { + if let Some(subscription) = self.subscriptions.get_mut(&endpoint) { + subscription.remove(&channel_id); + if subscription.is_empty() { + self.subscriptions.remove(&endpoint); } } } - pub fn on_channel_data(&mut self, channel_id: ChannelId, data: Vec) { - log::info!("[ClusterRoomDataChannel {}/Subscribers] Receive data from channel {channel_id}", self.room); + pub fn on_channel_data(&mut self, key: String, endpoints: Vec, data: Vec) { let pkt = return_if_none!(DataChannelPacket::deserialize(&data)); - if let Some(channel_container) = self.channels.get(&channel_id) { - for endpoint in &channel_container.subscribers { - self.queue.push_back(Output::Endpoint( - vec![*endpoint.0], - ClusterEndpointEvent::ChannelMessage(channel_container.key.clone(), pkt.from.clone(), pkt.data.clone()), - )); - } - } else { - log::warn!("[ClusterRoomDataChannel {}/Subscribers] Receive data from unknown channel {channel_id}", self.room); - } + self.queue + .push_back(Output::Endpoint(endpoints, ClusterEndpointEvent::VirtualChannelMessage(key, pkt.from.clone(), pkt.data.clone()))); } } @@ -145,165 +74,6 @@ impl Drop for DataChannelSubscriber { fn drop(&mut self) { log::info!("[ClusterRoomDataChannel {}/Subscriber] Drop", self.room); assert_eq!(self.queue.len(), 0, "Queue not empty on drop"); - assert_eq!(self.channels.len(), 0, "Channels not empty on drop"); - assert_eq!(self.subscribers.len(), 0, "Subscribers not empty on drop"); - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use atm0s_sdn::features::pubsub::{self, ChannelControl}; - use media_server_protocol::{datachannel::DataChannelPacket, endpoint::PeerId}; - use sans_io_runtime::TaskSwitcherChild; - - use crate::cluster::{ - id_generator, - room::datachannel::{subscriber::DataChannelSubscriber, Output}, - ClusterEndpointEvent, - }; - - #[test] - fn sub_unsub() { - let now = (); - let room = 1.into(); - let mut subscriber = DataChannelSubscriber::new(room); - let endpoint1 = 1; - let endpoint2 = 2; - let endpoint3 = 3; - let key = "test"; - let key2 = "test2"; - - // 1 -> test - // 2 -> test - // 3 -> test2 - - subscriber.on_channel_subscribe(endpoint1, key); - let channel_id = id_generator::gen_datachannel_id(room, key.to_string()); - let channel_id2 = id_generator::gen_datachannel_id(room, key2.to_string()); - - assert!(!subscriber.is_empty()); - // First subscriber will start publish and subscribe on pubsub channel - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); - assert_eq!(subscriber.pop_output(now), None); - - // Second subscriber will do nothing but register in the subscriber list - subscriber.on_channel_subscribe(endpoint2, key); - assert_eq!(subscriber.pop_output(now), None); - - // First subscriber of a new channel should start publish and subscribe too - subscriber.on_channel_subscribe(endpoint3, key2); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::PubStart)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::SubAuto)))); - - subscriber.on_channel_unsubscribe(endpoint1, key); - subscriber.on_channel_unsubscribe(endpoint2, key); - - // Last subscriber that unsubscribes will stop the channel - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); - - // Last channel that unsubscribes will stop publish and return empty resource - subscriber.on_channel_unsubscribe(endpoint3, key2); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::UnsubAuto)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::PubStop)))); - assert_eq!(subscriber.pop_output(now), Some(Output::OnResourceEmpty)); - - assert!(subscriber.is_empty()); - } - - #[test] - fn receive_data() { - let now = (); - let room = 1.into(); - let mut subscriber = DataChannelSubscriber::new(room); - let endpoint1 = 1; - let endpoint2 = 2; - let endpoint3 = 3; - let key = "test"; - let key_fake = "different_channel"; - - let channel_id = id_generator::gen_datachannel_id(room, key.to_string()); - let channel_id_fake = id_generator::gen_datachannel_id(room, key_fake.to_string()); - - subscriber.on_channel_subscribe(endpoint1, key); - subscriber.on_channel_subscribe(endpoint2, key); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); - assert_eq!(subscriber.pop_output(now), None); - - subscriber.on_channel_subscribe(endpoint3, key_fake); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::PubStart)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::SubAuto)))); - assert_eq!(subscriber.pop_output(now), None); - - let peer_id = PeerId::from("testid"); - let pkt = DataChannelPacket { - from: peer_id.clone(), - data: vec![1, 2, 3], - }; - subscriber.on_channel_data(channel_id, pkt.clone().serialize()); - let mut receivers = HashMap::new(); - receivers.insert(endpoint1, false); - receivers.insert(endpoint2, false); - receivers.insert(endpoint3, false); - - while let Some(out) = subscriber.pop_output(now) { - match out { - Output::Endpoint(endpoint, ClusterEndpointEvent::ChannelMessage(key, peer, data)) => { - assert_eq!(key, key); - assert_eq!(peer, peer_id); - assert_eq!(data, pkt.data); - assert!(receivers.contains_key(&endpoint[0])); - *receivers.get_mut(&endpoint[0]).unwrap() = true; - } - _ => panic!("Unexpected output: {:?}", out), - } - } - - // Every endpoint 1 and 2 should received the message - // Endpoint 3 should not receive anything - assert!(receivers[&endpoint1]); - assert!(receivers[&endpoint2]); - assert!(!receivers[&endpoint3]); - assert_eq!(subscriber.pop_output(now), None); - - subscriber.on_channel_unsubscribe(endpoint1, key); - subscriber.on_channel_unsubscribe(endpoint2, key); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); - subscriber.on_channel_unsubscribe(endpoint3, key_fake); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::UnsubAuto)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::PubStop)))); - - assert_eq!(subscriber.pop_output(now), Some(Output::OnResourceEmpty)); - } - - #[test] - fn leave_room() { - let now = (); - let room = 1.into(); - let mut subscriber = DataChannelSubscriber::new(room); - let endpoint1 = 1; - let endpoint2 = 2; - let key = "test"; - - let channel_id = id_generator::gen_datachannel_id(room, key.to_string()); - - subscriber.on_channel_subscribe(endpoint1, key); - subscriber.on_channel_subscribe(endpoint2, key); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); - assert_eq!(subscriber.pop_output(now), None); - - subscriber.on_leave(endpoint1); - assert_eq!(subscriber.pop_output(now), None); - subscriber.on_leave(endpoint2); - - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); - assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); - assert_eq!(subscriber.pop_output(now), Some(Output::OnResourceEmpty)); + assert_eq!(self.subscriptions.len(), 0, "Subscribers not empty on drop"); } } diff --git a/packages/media_core/src/endpoint/internal.rs b/packages/media_core/src/endpoint/internal.rs index cfbc7150..352b13ac 100644 --- a/packages/media_core/src/endpoint/internal.rs +++ b/packages/media_core/src/endpoint/internal.rs @@ -188,7 +188,7 @@ impl EndpointInternal { EndpointReq::SubscribeChannel(key) => { if let Some((room, _, _, _)) = &self.joined { self.queue.push_back(InternalOutput::RpcRes(req_id, EndpointRes::SubscribeChannel(Ok(())))); - self.queue.push_back(InternalOutput::Cluster(*room, ClusterEndpointControl::SubscribeChannel(key))); + self.queue.push_back(InternalOutput::Cluster(*room, ClusterEndpointControl::SubscribeVirtualChannel(key))); } else { self.queue .push_back(InternalOutput::RpcRes(req_id, EndpointRes::SubscribeChannel(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom))))); @@ -197,7 +197,7 @@ impl EndpointInternal { EndpointReq::UnsubscribeChannel(key) => { if let Some((room, _, _, _)) = &self.joined { self.queue.push_back(InternalOutput::RpcRes(req_id, EndpointRes::UnsubscribeChannel(Ok(())))); - self.queue.push_back(InternalOutput::Cluster(*room, ClusterEndpointControl::UnsubscribeChannel(key))); + self.queue.push_back(InternalOutput::Cluster(*room, ClusterEndpointControl::UnsubscribeVirtualChannel(key))); } else { self.queue .push_back(InternalOutput::RpcRes(req_id, EndpointRes::UnsubscribeChannel(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom))))); @@ -376,7 +376,7 @@ impl EndpointInternal { }, ClusterEndpointEvent::RemoteTrack(track, event) => self.on_cluster_remote_track(now, track, event), ClusterEndpointEvent::LocalTrack(track, event) => self.on_cluster_local_track(now, track, event), - ClusterEndpointEvent::ChannelMessage(key, from, message) => self.queue.push_back(InternalOutput::Event(EndpointEvent::ChannelMessage(key, from, message))), + ClusterEndpointEvent::VirtualChannelMessage(key, from, message) => self.queue.push_back(InternalOutput::Event(EndpointEvent::ChannelMessage(key, from, message))), } }