Skip to content

Commit

Permalink
feat: virtual datachannel & test
Browse files Browse the repository at this point in the history
  • Loading branch information
luongngocminh committed Jul 2, 2024
1 parent 46d068e commit 528567d
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 56 deletions.
4 changes: 2 additions & 2 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub enum ClusterEndpointControl {
LocalTrack(LocalTrackId, ClusterLocalTrackControl),
SubscribeChannel(String),
UnsubscribeChannel(String),
PublishChannel(String, PeerId, String),
PublishChannel(String, PeerId, Vec<u8>),
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -102,7 +102,7 @@ pub enum ClusterEndpointEvent {
AudioMixer(ClusterAudioMixerEvent),
RemoteTrack(RemoteTrackId, ClusterRemoteTrackEvent),
LocalTrack(LocalTrackId, ClusterLocalTrackEvent),
ChannelMessage(String, PeerId, String),
ChannelMessage(String, PeerId, Vec<u8>),
}

pub enum Input<Endpoint> {
Expand Down
3 changes: 1 addition & 2 deletions packages/media_core/src/cluster/id_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ pub fn gen_track_channel_id<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId,
h.finish().into()
}

pub fn gen_datachannel_id<T: From<u64>, Endpoint: Hash>(room: ClusterRoomHash, endpoint: Endpoint, key: String) -> T {
pub fn gen_datachannel_id<T: From<u64>>(room: ClusterRoomHash, key: String) -> T {
let mut h = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut h);
key.hash(&mut h);
endpoint.hash(&mut h);
"datachannel".hash(&mut h);
h.finish().into()
}
Expand Down
6 changes: 4 additions & 2 deletions packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<Endpoint: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Endpoint> {
}

pub fn is_empty(&self) -> bool {
self.metadata.is_empty() && self.media_track.is_empty() && self.audio_mixer.is_empty()
self.metadata.is_empty() && self.media_track.is_empty() && self.audio_mixer.is_empty() && self.datachannel.is_empty()
}

fn on_sdn_event(&mut self, now: Instant, userdata: RoomUserData, event: FeaturesEvent) {
Expand Down Expand Up @@ -191,6 +191,7 @@ impl<Endpoint: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Endpoint> {
ClusterEndpointControl::Leave => {
self.audio_mixer.input(&mut self.switcher).on_leave(now, endpoint);
self.metadata.input(&mut self.switcher).on_leave(endpoint);
self.datachannel.input(&mut self.switcher).on_leave(endpoint);
}
ClusterEndpointControl::SubscribePeer(target) => {
self.metadata.input(&mut self.switcher).on_subscribe_peer(endpoint, target);
Expand All @@ -206,7 +207,7 @@ impl<Endpoint: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Endpoint> {
ClusterEndpointControl::SubscribeChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_subscribe(endpoint, &key),
ClusterEndpointControl::PublishChannel(key, peer, message) => {
let data_packet = DataChannelPacket { from: peer, data: message };
self.datachannel.input(&mut self.switcher).on_channel_data(endpoint, &key, data_packet);
self.datachannel.input(&mut self.switcher).on_channel_data(&key, data_packet);
}
ClusterEndpointControl::UnsubscribeChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_unsubscribe(endpoint, &key),
}
Expand Down Expand Up @@ -260,6 +261,7 @@ impl<Endpoint: Debug + Copy + Clone + Hash + Eq> Drop for ClusterRoom<Endpoint>
assert!(self.audio_mixer.is_empty(), "Audio mixer not empty");
assert!(self.media_track.is_empty(), "Media track not empty");
assert!(self.metadata.is_empty(), "Metadata not empty");
assert!(self.datachannel.is_empty(), "Data channel not empty");
}
}

Expand Down
10 changes: 8 additions & 2 deletions packages/media_core/src/cluster/room/datachannel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ impl<Endpoint: Hash + Eq + Copy + Debug> RoomChannel<Endpoint> {
}
}

pub fn on_channel_data(&mut self, endpoint: Endpoint, key: &str, data: DataChannelPacket) {
self.publisher.input(&mut self.switcher).on_channel_data(endpoint, key, data);
pub fn on_channel_data(&mut self, key: &str, data: DataChannelPacket) {
self.publisher.input(&mut self.switcher).on_channel_data(key, data);
}

pub fn on_channel_subscribe(&mut self, endpoint: Endpoint, key: &str) {
Expand All @@ -68,6 +68,10 @@ impl<Endpoint: Hash + Eq + Copy + Debug> RoomChannel<Endpoint> {
pub fn on_channel_unsubscribe(&mut self, endpoint: Endpoint, key: &str) {
self.subscriber.input(&mut self.switcher).on_channel_unsubscribe(endpoint, key);
}

pub fn on_leave(&mut self, endpoint: Endpoint) {
self.subscriber.input(&mut self.switcher).on_leave(endpoint);
}
}

impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for RoomChannel<Endpoint> {
Expand All @@ -78,6 +82,7 @@ impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for
match self.switcher.current()?.try_into().ok()? {
TaskType::Publisher => {
if let Some(out) = self.publisher.pop_output((), &mut self.switcher) {
log::info!("[ClusterRoomDataChannel] poped Output publisher {:?}", out);

Check warning on line 85 in packages/media_core/src/cluster/room/datachannel.rs

View workflow job for this annotation

GitHub Actions / typos

"poped" should be "popped" or "pooped".
if let Output::OnResourceEmpty = out {
if self.is_empty() {
return Some(Output::OnResourceEmpty);
Expand All @@ -89,6 +94,7 @@ impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for
}
TaskType::Subscriber => {
if let Some(out) = self.subscriber.pop_output((), &mut self.switcher) {
log::info!("[ClusterRoomDataChannel] poped Output Subscriber {:?}", out);

Check warning on line 97 in packages/media_core/src/cluster/room/datachannel.rs

View workflow job for this annotation

GitHub Actions / typos

"poped" should be "popped" or "pooped".
if let Output::OnResourceEmpty = out {
if self.is_empty() {
return Some(Output::OnResourceEmpty);
Expand Down
20 changes: 4 additions & 16 deletions packages/media_core/src/cluster/room/datachannel/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,12 @@ impl<Endpoint: Hash + Eq + Copy + Debug> DataChannelPublisher<Endpoint> {
self.queue.is_empty()
}

pub fn on_channel_publish(&mut self, endpoint: Endpoint, key: &str) {
log::trace!("[ClusterRoom {}/Publishers] peer {:?} publish datachannel", self.room, endpoint);
let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, endpoint, key.to_string());
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))
}

pub fn on_channel_data(&mut self, endpoint: Endpoint, key: &str, data: DataChannelPacket) {
log::trace!("[ClusterRoom {}/Publishers] peer {:?} publish datachannel", self.room, endpoint);
pub fn on_channel_data(&mut self, key: &str, data: DataChannelPacket) {
log::trace!("[ClusterRoomDataChannel {}/Publishers] publish virtual datachannel", self.room);
let data = data.serialize();
let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, endpoint, key.to_string());
let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, key.to_string());
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubData(data))))
}

pub fn on_channel_unpublish(&mut self, endpoint: Endpoint, key: &str) {
let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, endpoint, key.to_string());
log::info!("[ClusterRoom {}/Publishers] peer {:?} stopped datachannel", self.room, endpoint);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)));
}
}

impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for DataChannelPublisher<Endpoint> {
Expand All @@ -51,7 +39,7 @@ impl<Endpoint: Debug + Hash + Eq + Copy> TaskSwitcherChild<Output<Endpoint>> for

impl<Endpoint> Drop for DataChannelPublisher<Endpoint> {
fn drop(&mut self) {
log::info!("[ClusterRoom {}/Publishers] Drop", self.room);
log::info!("[ClusterRoomDataChannel {}/Publishers] Drop", self.room);
assert_eq!(self.queue.len(), 0, "Queue not empty on drop");
}
}
Loading

0 comments on commit 528567d

Please sign in to comment.