Skip to content

Commit

Permalink
refactor: update
Browse files Browse the repository at this point in the history
  • Loading branch information
luongngocminh committed Jul 10, 2024
1 parent 528567d commit 642b9f1
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 283 deletions.
6 changes: 3 additions & 3 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ pub enum ClusterEndpointControl {
AudioMixer(ClusterAudioMixerControl),
RemoteTrack(RemoteTrackId, ClusterRemoteTrackControl),
LocalTrack(LocalTrackId, ClusterLocalTrackControl),
SubscribeChannel(String),
UnsubscribeChannel(String),
SubscribeVirtualChannel(String),
UnsubscribeVirtualChannel(String),
PublishChannel(String, PeerId, Vec<u8>),
}

Expand All @@ -102,7 +102,7 @@ pub enum ClusterEndpointEvent {
AudioMixer(ClusterAudioMixerEvent),
RemoteTrack(RemoteTrackId, ClusterRemoteTrackEvent),
LocalTrack(LocalTrackId, ClusterLocalTrackEvent),
ChannelMessage(String, PeerId, Vec<u8>),
VirtualChannelMessage(String, PeerId, Vec<u8>),
}

pub enum Input<Endpoint> {
Expand Down
2 changes: 1 addition & 1 deletion packages/media_core/src/cluster/id_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn gen_track_channel_id<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId,
h.finish().into()
}

pub fn gen_datachannel_id<T: From<u64>>(room: ClusterRoomHash, key: String) -> T {
pub fn gen_data_channel_id<T: From<u64>>(room: ClusterRoomHash, key: String) -> T {
let mut h = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut h);
key.hash(&mut h);
Expand Down
4 changes: 2 additions & 2 deletions packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ impl<Endpoint: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Endpoint> {
}
ClusterEndpointControl::RemoteTrack(track, control) => self.on_control_remote_track(now, endpoint, track, control),
ClusterEndpointControl::LocalTrack(track, control) => self.on_control_local_track(now, endpoint, track, control),
ClusterEndpointControl::SubscribeChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_subscribe(endpoint, &key),
ClusterEndpointControl::SubscribeVirtualChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_subscribe(endpoint, &key),
ClusterEndpointControl::PublishChannel(key, peer, message) => {
let data_packet = DataChannelPacket { from: peer, data: message };
self.datachannel.input(&mut self.switcher).on_channel_data(&key, data_packet);
}
ClusterEndpointControl::UnsubscribeChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_unsubscribe(endpoint, &key),
ClusterEndpointControl::UnsubscribeVirtualChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_unsubscribe(endpoint, &key),
}
}
}
Expand Down
243 changes: 229 additions & 14 deletions packages/media_core/src/cluster/room/datachannel.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::{fmt::Debug, hash::Hash};

use atm0s_sdn::features::pubsub::{self};
use atm0s_sdn::features::pubsub::{self, ChannelId};
use media_server_protocol::datachannel::DataChannelPacket;
use publisher::DataChannelPublisher;
use sans_io_runtime::{TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild};
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
hash::Hash,
};
use subscriber::DataChannelSubscriber;

use crate::cluster::{ClusterEndpointEvent, ClusterRoomHash};
use crate::cluster::{id_generator, ClusterEndpointEvent, ClusterRoomHash};

mod publisher;
mod subscriber;
Expand All @@ -25,52 +28,104 @@ pub enum Output<Endpoint> {
OnResourceEmpty,
}

struct ChannelContainer<Endpoint> {
subscribers: HashSet<Endpoint>,
key: String,
}

pub struct RoomChannel<Endpoint> {
room: ClusterRoomHash,
channels: HashMap<ChannelId, ChannelContainer<Endpoint>>,
publisher: TaskSwitcherBranch<DataChannelPublisher<Endpoint>, Output<Endpoint>>,
subscriber: TaskSwitcherBranch<DataChannelSubscriber<Endpoint>, Output<Endpoint>>,
switcher: TaskSwitcher,
}

impl<Endpoint: Hash + Eq + Copy + Debug> RoomChannel<Endpoint> {
pub fn new(room: ClusterRoomHash) -> Self {
log::info!("[ClusterRoomMetadata] Create {}", room);
log::info!("[ClusterRoomDataChannel {}] Create virtual datachannel", room);
Self {
room,
channels: HashMap::new(),
publisher: TaskSwitcherBranch::new(DataChannelPublisher::new(room), TaskType::Publisher),
subscriber: TaskSwitcherBranch::new(DataChannelSubscriber::new(room), TaskType::Subscriber),
switcher: TaskSwitcher::new(2),
}
}

pub fn is_empty(&self) -> bool {
self.publisher.is_empty() && self.subscriber.is_empty()
self.publisher.is_empty() && self.subscriber.is_empty() && self.channels.is_empty()
}

pub fn on_pubsub_event(&mut self, event: pubsub::Event) {
let channel = event.0;
let channel_id = event.0;
match event.1 {
pubsub::ChannelEvent::SourceData(_, data) => {
self.subscriber.input(&mut self.switcher).on_channel_data(channel, data);
if let Some(channel) = self.channels.get(&channel_id) {
log::info!("[ClusterRoomDataChannel {}] Got message from channel {channel_id}, try publish to subscribers", self.room);
let endpoints: Vec<Endpoint> = channel.subscribers.iter().copied().collect();
self.subscriber.input(&mut self.switcher).on_channel_data(channel.key.clone(), endpoints, data);
} else {
log::warn!("[ClusterRoomDataChannel {}] Unexpected Channel {}", self.room, channel_id);
}
}
_ => {}
}
}

pub fn on_channel_data(&mut self, key: &str, data: DataChannelPacket) {
self.publisher.input(&mut self.switcher).on_channel_data(key, data);
let channel_id: ChannelId = id_generator::gen_data_channel_id(self.room, key.to_string());
self.publisher.input(&mut self.switcher).on_channel_data(channel_id, data);
}

pub fn on_channel_subscribe(&mut self, endpoint: Endpoint, key: &str) {
self.subscriber.input(&mut self.switcher).on_channel_subscribe(endpoint, key);
log::info!("[ClusterRoomDataChannel {}] Endpoint {:?} Subscribe Channel {key}", self.room, endpoint);
let channel_id: ChannelId = id_generator::gen_data_channel_id(self.room, key.to_string());
if let Some(channel) = self.channels.get_mut(&channel_id) {
if !channel.subscribers.insert(endpoint) {
log::warn!("[ClusterRoomDataChannel {}] Endpoint {:?} already subscribed to Channel {}", self.room, endpoint, channel_id);
return;
}
self.subscriber.input(&mut self.switcher).on_channel_subscribe(endpoint, channel_id);
} else {
log::info!("[ClusterRoomDataChannel {}] Create new Channel {}", self.room, channel_id);
let mut channel = ChannelContainer {
subscribers: HashSet::new(),
key: key.to_string(),
};
channel.subscribers.insert(endpoint);
self.channels.insert(channel_id, channel);
self.publisher.input(&mut self.switcher).on_channel_create(channel_id);
self.subscriber.input(&mut self.switcher).on_channel_create(endpoint, channel_id);
}
}

pub fn on_channel_unsubscribe(&mut self, endpoint: Endpoint, key: &str) {
self.subscriber.input(&mut self.switcher).on_channel_unsubscribe(endpoint, key);
log::info!("[ClusterRoomDataChannel {}] Endpoint {:?} Unsubscribe Channel {key}", self.room, endpoint);
let channel_id: ChannelId = id_generator::gen_data_channel_id(self.room, key.to_string());
self.unsub_channel(endpoint, channel_id);
}

fn unsub_channel(&mut self, endpoint: Endpoint, channel_id: ChannelId) {
if let Some(channel) = self.channels.get_mut(&channel_id) {
if !channel.subscribers.remove(&endpoint) {
log::warn!("[ClusterRoomDataChannel {}] Endpoint {:?} not subscribed to Channel {}", self.room, endpoint, channel_id);
return;
}
self.subscriber.input(&mut self.switcher).on_channel_unsubscribe(endpoint, channel_id);
if channel.subscribers.is_empty() {
log::info!("[ClusterRoomDataChannel {}] Channel have no subscribers, remove Channel {}", self.room, channel_id);
self.publisher.input(&mut self.switcher).on_channel_close(channel_id);
self.subscriber.input(&mut self.switcher).on_channel_close(channel_id);
self.channels.remove(&channel_id);
}
}
}

pub fn on_leave(&mut self, endpoint: Endpoint) {
self.subscriber.input(&mut self.switcher).on_leave(endpoint);
for channel_id in self.subscriber.get_subscriptions(endpoint) {
self.unsub_channel(endpoint, channel_id);
}
}
}

Expand All @@ -82,7 +137,6 @@ impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for
match self.switcher.current()?.try_into().ok()? {
TaskType::Publisher => {
if let Some(out) = self.publisher.pop_output((), &mut self.switcher) {
log::info!("[ClusterRoomDataChannel] poped Output publisher {:?}", out);
if let Output::OnResourceEmpty = out {
if self.is_empty() {
return Some(Output::OnResourceEmpty);
Expand All @@ -94,7 +148,6 @@ impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for
}
TaskType::Subscriber => {
if let Some(out) = self.subscriber.pop_output((), &mut self.switcher) {
log::info!("[ClusterRoomDataChannel] poped Output Subscriber {:?}", out);
if let Output::OnResourceEmpty = out {
if self.is_empty() {
return Some(Output::OnResourceEmpty);
Expand All @@ -112,5 +165,167 @@ impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for
impl<Endpoint> Drop for RoomChannel<Endpoint> {
fn drop(&mut self) {
log::info!("[ClusterRoomDataChannel] Drop {}", self.room);
assert!(self.channels.is_empty(), "channels should be empty on drop");
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use atm0s_sdn::features::pubsub::{self, ChannelControl};
use media_server_protocol::{datachannel::DataChannelPacket, endpoint::PeerId};
use sans_io_runtime::TaskSwitcherChild;

use crate::cluster::{
id_generator,
room::datachannel::{Output, RoomChannel},
ClusterEndpointEvent,
};

#[test]
fn sub_unsub() {
let now = ();
let room_id = 1.into();
let mut room = RoomChannel::new(room_id);
let endpoint1 = 1;
let endpoint2 = 2;
let endpoint3 = 3;
let key = "test";
let key2 = "test2";

// 1 -> test
// 2 -> test
// 3 -> test2
let channel_id = id_generator::gen_data_channel_id(room_id, key.to_string());
let channel_id2 = id_generator::gen_data_channel_id(room_id, key2.to_string());

// First subscriber will start publish and subscribe on pubsub channel
room.on_channel_subscribe(endpoint1, key);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto))));
assert_eq!(room.pop_output(now), None);

// Second subscriber will do nothing but register in the subscriber list
room.on_channel_subscribe(endpoint2, key);
assert_eq!(room.pop_output(now), None);
assert_eq!(room.subscriber.get_subscriptions(endpoint1), vec![channel_id]);
assert_eq!(room.subscriber.get_subscriptions(endpoint2), vec![channel_id]);

// First subscriber of a new channel should start publish and subscribe too
room.on_channel_subscribe(endpoint3, key2);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::PubStart))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::SubAuto))));
assert_eq!(room.subscriber.get_subscriptions(endpoint3), vec![channel_id2]);

// Last subscriber that unsubscribes will stop the channel
room.on_channel_unsubscribe(endpoint1, key);
assert_eq!(room.subscriber.get_subscriptions(endpoint1), vec![]);
room.on_channel_unsubscribe(endpoint2, key);
assert_eq!(room.subscriber.get_subscriptions(endpoint2), vec![]);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))));

room.on_channel_unsubscribe(endpoint3, key2);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::PubStop))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::UnsubAuto))));
assert_eq!(room.pop_output(now), None);

assert!(room.subscriber.is_empty());
}

#[test]
fn receive_data() {
let now = ();
let room_id = 1.into();
let mut room = RoomChannel::new(room_id);
let endpoint1 = 1;
let endpoint2 = 2;
let endpoint3 = 3;
let key = "test";
let key_fake = "different_channel";

let channel_id = id_generator::gen_data_channel_id(room_id, key.to_string());
let channel_id_fake = id_generator::gen_data_channel_id(room_id, key_fake.to_string());

room.on_channel_subscribe(endpoint1, key);
room.on_channel_subscribe(endpoint2, key);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto))));
assert_eq!(room.pop_output(now), None);

room.on_channel_subscribe(endpoint3, key_fake);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::PubStart))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::SubAuto))));
assert_eq!(room.pop_output(now), None);

let peer_id = PeerId::from("testid");
let pkt = DataChannelPacket {
from: peer_id.clone(),
data: vec![1, 2, 3],
};
room.on_pubsub_event(pubsub::Event(channel_id, pubsub::ChannelEvent::SourceData(1, pkt.serialize())));
let mut receivers = HashMap::new();
receivers.insert(endpoint1, false);
receivers.insert(endpoint2, false);
receivers.insert(endpoint3, false);

if let Some(out) = room.pop_output(now) {
match out {
Output::Endpoint(endpoints, ClusterEndpointEvent::VirtualChannelMessage(key, peer, data)) => {
assert_eq!(key, key);
assert_eq!(peer, peer_id);
assert_eq!(data, pkt.data);
for endpoint in endpoints {
*receivers.get_mut(&endpoint).unwrap() = true;
}
}
_ => panic!("Unexpected output: {:?}", out),
}
}

// Every endpoint 1 and 2 should received the message
// Endpoint 3 should not receive anything
assert!(receivers[&endpoint1]);
assert!(receivers[&endpoint2]);
assert!(!receivers[&endpoint3]);
assert_eq!(room.pop_output(now), None);

room.on_channel_unsubscribe(endpoint1, key);
room.on_channel_unsubscribe(endpoint2, key);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))));

room.on_channel_unsubscribe(endpoint3, key_fake);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::PubStop))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::UnsubAuto))));

assert_eq!(room.pop_output(now), None);
}

#[test]
fn leave_room() {
let now = ();
let room_id = 1.into();
let mut room = RoomChannel::new(room_id);
let endpoint1 = 1;
let endpoint2 = 2;
let key = "test";

let channel_id = id_generator::gen_data_channel_id(room_id, key.to_string());

room.on_channel_subscribe(endpoint1, key);
room.on_channel_subscribe(endpoint2, key);
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto))));
assert_eq!(room.pop_output(now), None);

room.on_leave(endpoint1);
assert_eq!(room.pop_output(now), None);
room.on_leave(endpoint2);

assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))));
assert_eq!(room.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))));
assert_eq!(room.pop_output(now), None);
}
}
17 changes: 13 additions & 4 deletions packages/media_core/src/cluster/room/datachannel/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId};
use media_server_protocol::datachannel::DataChannelPacket;
use sans_io_runtime::TaskSwitcherChild;

use crate::cluster::{id_generator, ClusterRoomHash};
use crate::cluster::ClusterRoomHash;

use super::Output;

Expand All @@ -22,12 +22,21 @@ impl<Endpoint: Hash + Eq + Copy + Debug> DataChannelPublisher<Endpoint> {
self.queue.is_empty()
}

pub fn on_channel_data(&mut self, key: &str, data: DataChannelPacket) {
log::trace!("[ClusterRoomDataChannel {}/Publishers] publish virtual datachannel", self.room);
pub fn on_channel_data(&mut self, channel_id: ChannelId, data: DataChannelPacket) {
log::info!("[ClusterRoomDataChannel {}/Publishers] publish virtual datachannel", self.room);
let data = data.serialize();
let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, key.to_string());
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubData(data))))
}

pub fn on_channel_create(&mut self, channel_id: ChannelId) {
log::info!("[ClusterRoomDataChannel {}/Publishers] publish start virtual datachannel", self.room);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))
}

pub fn on_channel_close(&mut self, channel_id: ChannelId) {
log::info!("[ClusterRoomDataChannel {}/Publishers] publish stop virtual datachannel", self.room);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))
}
}

impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for DataChannelPublisher<Endpoint> {
Expand Down
Loading

0 comments on commit 642b9f1

Please sign in to comment.