From 528567d1c8ece0c897314f6c5d22b117dde08ba2 Mon Sep 17 00:00:00 2001 From: Luong Minh Date: Tue, 2 Jul 2024 11:37:19 +0700 Subject: [PATCH] feat: virtual datachannel & test --- packages/media_core/src/cluster.rs | 4 +- .../media_core/src/cluster/id_generator.rs | 3 +- packages/media_core/src/cluster/room.rs | 6 +- .../src/cluster/room/datachannel.rs | 10 +- .../src/cluster/room/datachannel/publisher.rs | 20 +- .../cluster/room/datachannel/subscriber.rs | 248 ++++++++++++++++-- packages/media_core/src/endpoint.rs | 4 +- packages/protocol/proto/sdk/session.proto | 4 +- packages/protocol/src/datachannel.rs | 2 +- packages/protocol/src/protobuf/session.rs | 8 +- .../transport_webrtc/src/transport/webrtc.rs | 4 +- 11 files changed, 257 insertions(+), 56 deletions(-) diff --git a/packages/media_core/src/cluster.rs b/packages/media_core/src/cluster.rs index 303a4fbb..518dcede 100644 --- a/packages/media_core/src/cluster.rs +++ b/packages/media_core/src/cluster.rs @@ -90,7 +90,7 @@ pub enum ClusterEndpointControl { LocalTrack(LocalTrackId, ClusterLocalTrackControl), SubscribeChannel(String), UnsubscribeChannel(String), - PublishChannel(String, PeerId, String), + PublishChannel(String, PeerId, Vec), } #[derive(Clone, Debug, PartialEq, Eq)] @@ -102,7 +102,7 @@ pub enum ClusterEndpointEvent { AudioMixer(ClusterAudioMixerEvent), RemoteTrack(RemoteTrackId, ClusterRemoteTrackEvent), LocalTrack(LocalTrackId, ClusterLocalTrackEvent), - ChannelMessage(String, PeerId, String), + ChannelMessage(String, PeerId, Vec), } pub enum Input { diff --git a/packages/media_core/src/cluster/id_generator.rs b/packages/media_core/src/cluster/id_generator.rs index ffdef6b4..22c3d441 100644 --- a/packages/media_core/src/cluster/id_generator.rs +++ b/packages/media_core/src/cluster/id_generator.rs @@ -41,11 +41,10 @@ pub fn gen_track_channel_id>(room: ClusterRoomHash, peer: &PeerId, h.finish().into() } -pub fn gen_datachannel_id, Endpoint: Hash>(room: ClusterRoomHash, endpoint: Endpoint, key: String) -> T { +pub fn gen_datachannel_id>(room: ClusterRoomHash, key: String) -> T { let mut h = std::hash::DefaultHasher::new(); room.as_ref().hash(&mut h); key.hash(&mut h); - endpoint.hash(&mut h); "datachannel".hash(&mut h); h.finish().into() } diff --git a/packages/media_core/src/cluster/room.rs b/packages/media_core/src/cluster/room.rs index 4454d7d8..f7395759 100644 --- a/packages/media_core/src/cluster/room.rs +++ b/packages/media_core/src/cluster/room.rs @@ -160,7 +160,7 @@ impl ClusterRoom { } pub fn is_empty(&self) -> bool { - self.metadata.is_empty() && self.media_track.is_empty() && self.audio_mixer.is_empty() + self.metadata.is_empty() && self.media_track.is_empty() && self.audio_mixer.is_empty() && self.datachannel.is_empty() } fn on_sdn_event(&mut self, now: Instant, userdata: RoomUserData, event: FeaturesEvent) { @@ -191,6 +191,7 @@ impl ClusterRoom { ClusterEndpointControl::Leave => { self.audio_mixer.input(&mut self.switcher).on_leave(now, endpoint); self.metadata.input(&mut self.switcher).on_leave(endpoint); + self.datachannel.input(&mut self.switcher).on_leave(endpoint); } ClusterEndpointControl::SubscribePeer(target) => { self.metadata.input(&mut self.switcher).on_subscribe_peer(endpoint, target); @@ -206,7 +207,7 @@ impl ClusterRoom { ClusterEndpointControl::SubscribeChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_subscribe(endpoint, &key), ClusterEndpointControl::PublishChannel(key, peer, message) => { let data_packet = DataChannelPacket { from: peer, data: message }; - self.datachannel.input(&mut self.switcher).on_channel_data(endpoint, &key, data_packet); + self.datachannel.input(&mut self.switcher).on_channel_data(&key, data_packet); } ClusterEndpointControl::UnsubscribeChannel(key) => self.datachannel.input(&mut self.switcher).on_channel_unsubscribe(endpoint, &key), } @@ -260,6 +261,7 @@ impl Drop for ClusterRoom assert!(self.audio_mixer.is_empty(), "Audio mixer not empty"); assert!(self.media_track.is_empty(), "Media track not empty"); assert!(self.metadata.is_empty(), "Metadata not empty"); + assert!(self.datachannel.is_empty(), "Data channel not empty"); } } diff --git a/packages/media_core/src/cluster/room/datachannel.rs b/packages/media_core/src/cluster/room/datachannel.rs index e50f7a65..eec334fd 100644 --- a/packages/media_core/src/cluster/room/datachannel.rs +++ b/packages/media_core/src/cluster/room/datachannel.rs @@ -57,8 +57,8 @@ impl RoomChannel { } } - pub fn on_channel_data(&mut self, endpoint: Endpoint, key: &str, data: DataChannelPacket) { - self.publisher.input(&mut self.switcher).on_channel_data(endpoint, key, data); + pub fn on_channel_data(&mut self, key: &str, data: DataChannelPacket) { + self.publisher.input(&mut self.switcher).on_channel_data(key, data); } pub fn on_channel_subscribe(&mut self, endpoint: Endpoint, key: &str) { @@ -68,6 +68,10 @@ impl RoomChannel { pub fn on_channel_unsubscribe(&mut self, endpoint: Endpoint, key: &str) { self.subscriber.input(&mut self.switcher).on_channel_unsubscribe(endpoint, key); } + + pub fn on_leave(&mut self, endpoint: Endpoint) { + self.subscriber.input(&mut self.switcher).on_leave(endpoint); + } } impl TaskSwitcherChild> for RoomChannel { @@ -78,6 +82,7 @@ impl TaskSwitcherChild> for match self.switcher.current()?.try_into().ok()? { TaskType::Publisher => { if let Some(out) = self.publisher.pop_output((), &mut self.switcher) { + log::info!("[ClusterRoomDataChannel] poped Output publisher {:?}", out); if let Output::OnResourceEmpty = out { if self.is_empty() { return Some(Output::OnResourceEmpty); @@ -89,6 +94,7 @@ impl TaskSwitcherChild> for } TaskType::Subscriber => { if let Some(out) = self.subscriber.pop_output((), &mut self.switcher) { + log::info!("[ClusterRoomDataChannel] poped Output Subscriber {:?}", out); if let Output::OnResourceEmpty = out { if self.is_empty() { return Some(Output::OnResourceEmpty); diff --git a/packages/media_core/src/cluster/room/datachannel/publisher.rs b/packages/media_core/src/cluster/room/datachannel/publisher.rs index b7a12463..e77e3be7 100644 --- a/packages/media_core/src/cluster/room/datachannel/publisher.rs +++ b/packages/media_core/src/cluster/room/datachannel/publisher.rs @@ -22,24 +22,12 @@ impl DataChannelPublisher { self.queue.is_empty() } - pub fn on_channel_publish(&mut self, endpoint: Endpoint, key: &str) { - log::trace!("[ClusterRoom {}/Publishers] peer {:?} publish datachannel", self.room, endpoint); - let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, endpoint, key.to_string()); - self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart))) - } - - pub fn on_channel_data(&mut self, endpoint: Endpoint, key: &str, data: DataChannelPacket) { - log::trace!("[ClusterRoom {}/Publishers] peer {:?} publish datachannel", self.room, endpoint); + pub fn on_channel_data(&mut self, key: &str, data: DataChannelPacket) { + log::trace!("[ClusterRoomDataChannel {}/Publishers] publish virtual datachannel", self.room); let data = data.serialize(); - let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, endpoint, key.to_string()); + let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, key.to_string()); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubData(data)))) } - - pub fn on_channel_unpublish(&mut self, endpoint: Endpoint, key: &str) { - let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, endpoint, key.to_string()); - log::info!("[ClusterRoom {}/Publishers] peer {:?} stopped datachannel", self.room, endpoint); - self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))); - } } impl TaskSwitcherChild> for DataChannelPublisher { @@ -51,7 +39,7 @@ impl TaskSwitcherChild> for impl Drop for DataChannelPublisher { fn drop(&mut self) { - log::info!("[ClusterRoom {}/Publishers] Drop", self.room); + log::info!("[ClusterRoomDataChannel {}/Publishers] Drop", self.room); assert_eq!(self.queue.len(), 0, "Queue not empty on drop"); } } diff --git a/packages/media_core/src/cluster/room/datachannel/subscriber.rs b/packages/media_core/src/cluster/room/datachannel/subscriber.rs index b3402f1d..86dcc6ed 100644 --- a/packages/media_core/src/cluster/room/datachannel/subscriber.rs +++ b/packages/media_core/src/cluster/room/datachannel/subscriber.rs @@ -16,14 +16,14 @@ use super::Output; #[derive(Derivative)] #[derivative(Default(bound = ""))] struct ChannelContainer { - subscribers: Vec, + subscribers: HashMap, key: String, } pub struct DataChannelSubscriber { room: ClusterRoomHash, channels: HashMap>, - // subscribers: HashMap, + subscribers: HashMap>, queue: VecDeque>, } @@ -33,6 +33,7 @@ impl DataChannelSubscriber { room, queue: VecDeque::new(), channels: HashMap::new(), + subscribers: HashMap::new(), } } @@ -41,48 +42,94 @@ impl DataChannelSubscriber { } pub fn on_channel_subscribe(&mut self, endpoint: Endpoint, key: &str) { - let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, endpoint, key.to_string()); - log::info!("[ClusterRoom {}/Subscribers] peer {:?} subscribe channel: {channel_id}", self.room, endpoint); - let channel_container = self.channels.entry(channel_id).or_insert(ChannelContainer { - subscribers: vec![], - key: key.to_string(), + let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, key.to_string()); + log::info!("[ClusterRoomDataChannel {}/Subscribers] peer {:?} subscribe channel: {channel_id}", self.room, endpoint); + let channel_container = self.channels.entry(channel_id).or_insert_with(|| { + let channel = ChannelContainer { + subscribers: HashMap::new(), + key: key.to_string(), + }; + self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart))); + channel }); - channel_container.subscribers.push(endpoint); + + if let Some(subscriber) = self.subscribers.get_mut(&endpoint) { + if !channel_container.subscribers.contains_key(&endpoint) { + subscriber.push(channel_id); + } + } else { + self.subscribers.insert(endpoint, vec![channel_id]); + } + channel_container.subscribers.insert(endpoint, ()); + if channel_container.subscribers.len() == 1 { - log::info!("[ClusterRoom {}/Subscribers] first subscriber => Sub channel {channel_id}", self.room); + log::info!("[ClusterRoomDataChannel {}/Subscribers] first subscriber => Sub channel {channel_id}", self.room); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto))); } } pub fn on_channel_unsubscribe(&mut self, endpoint: Endpoint, key: &str) { - let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, endpoint, key.to_string()); - log::info!("[ClusterRoom {}/Subscribers] peer {:?} unsubscribe channel: {channel_id}", self.room, endpoint); - if let Some(channel_container) = self.channels.get_mut(&channel_id) { - if let Some(index) = channel_container.subscribers.iter().position(|x| *x == endpoint) { - channel_container.subscribers.swap_remove(index); + let channel_id: ChannelId = id_generator::gen_datachannel_id(self.room, key.to_string()); + log::info!("[ClusterRoomDataChannel {}/Subscribers] peer {:?} unsubscribe channel: {channel_id}", self.room, endpoint); + let channel_container = return_if_none!(self.channels.get_mut(&channel_id)); + if channel_container.subscribers.contains_key(&endpoint) { + channel_container.subscribers.remove(&endpoint); + if let Some(endpoint_subscriptions) = self.subscribers.get_mut(&endpoint) { + if let Some(index) = endpoint_subscriptions.iter().position(|x| *x == channel_id) { + endpoint_subscriptions.swap_remove(index); + } + + if endpoint_subscriptions.is_empty() { + self.subscribers.remove(&endpoint); + } + } + if channel_container.subscribers.is_empty() { + log::info!("[ClusterRoomDataChannel {}/Subscribers] last subscriber => Unsub channel {channel_id}", self.room); + self.channels.remove(&channel_id); + self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))); + self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))); + if self.channels.is_empty() { + log::info!("[ClusterRoomDataChannel {}/Subscribers] last channel => Stop channel {channel_id}", self.room); + self.queue.push_back(Output::OnResourceEmpty); + } + } + } else { + log::warn!("[ClusterRoomDataChannel {}/Subscribers] peer {:?} not subscribe in channel {channel_id}", self.room, endpoint); + } + } + + pub fn on_leave(&mut self, endpoint: Endpoint) { + log::info!("[ClusterRoomDataChannel {}/Subscribers] peer {:?} leave", self.room, endpoint); + let subscriber = return_if_none!(self.subscribers.remove(&endpoint)); + for channel_id in subscriber { + if let Some(channel_container) = self.channels.get_mut(&channel_id) { + channel_container.subscribers.remove(&endpoint); if channel_container.subscribers.is_empty() { - log::info!("[ClusterRoom {}/Subscribers] last subscriber => Unsub channel {channel_id}", self.room); + log::info!("[ClusterRoomDataChannel {}/Subscribers] last subscriber => Unsub channel {channel_id}", self.room); self.channels.remove(&channel_id); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))); + self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))); + } + if self.channels.is_empty() { + log::info!("[ClusterRoomDataChannel {}/Subscribers] last channel => Stop channel {channel_id}", self.room); + self.queue.push_back(Output::OnResourceEmpty); } - } else { - log::warn!("[ClusterRoom {}/Subscribers] peer {:?} not subscribe in channel {channel_id}", self.room, endpoint); } } } pub fn on_channel_data(&mut self, channel_id: ChannelId, data: Vec) { - log::info!("[ClusterRoom {}/Subscribers] Receive data from channel {channel_id}", self.room); + log::info!("[ClusterRoomDataChannel {}/Subscribers] Receive data from channel {channel_id}", self.room); let pkt = return_if_none!(DataChannelPacket::deserialize(&data)); if let Some(channel_container) = self.channels.get(&channel_id) { for endpoint in &channel_container.subscribers { self.queue.push_back(Output::Endpoint( - vec![*endpoint], + vec![*endpoint.0], ClusterEndpointEvent::ChannelMessage(channel_container.key.clone(), pkt.from.clone(), pkt.data.clone()), )); } } else { - log::warn!("[ClusterRoom {}/Subscribers] Receive data from unknown channel {channel_id}", self.room); + log::warn!("[ClusterRoomDataChannel {}/Subscribers] Receive data from unknown channel {channel_id}", self.room); } } } @@ -96,8 +143,167 @@ impl TaskSwitcherChild> for impl Drop for DataChannelSubscriber { fn drop(&mut self) { - log::info!("[ClusterRoom {}/Subscriber] Drop", self.room); + log::info!("[ClusterRoomDataChannel {}/Subscriber] Drop", self.room); assert_eq!(self.queue.len(), 0, "Queue not empty on drop"); assert_eq!(self.channels.len(), 0, "Channels not empty on drop"); + assert_eq!(self.subscribers.len(), 0, "Subscribers not empty on drop"); + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use atm0s_sdn::features::pubsub::{self, ChannelControl}; + use media_server_protocol::{datachannel::DataChannelPacket, endpoint::PeerId}; + use sans_io_runtime::TaskSwitcherChild; + + use crate::cluster::{ + id_generator, + room::datachannel::{subscriber::DataChannelSubscriber, Output}, + ClusterEndpointEvent, + }; + + #[test] + fn sub_unsub() { + let now = (); + let room = 1.into(); + let mut subscriber = DataChannelSubscriber::new(room); + let endpoint1 = 1; + let endpoint2 = 2; + let endpoint3 = 3; + let key = "test"; + let key2 = "test2"; + + // 1 -> test + // 2 -> test + // 3 -> test2 + + subscriber.on_channel_subscribe(endpoint1, key); + let channel_id = id_generator::gen_datachannel_id(room, key.to_string()); + let channel_id2 = id_generator::gen_datachannel_id(room, key2.to_string()); + + assert!(!subscriber.is_empty()); + // First subscriber will start publish and subscribe on pubsub channel + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(subscriber.pop_output(now), None); + + // Second subscriber will do nothing but register in the subscriber list + subscriber.on_channel_subscribe(endpoint2, key); + assert_eq!(subscriber.pop_output(now), None); + + // First subscriber of a new channel should start publish and subscribe too + subscriber.on_channel_subscribe(endpoint3, key2); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::PubStart)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::SubAuto)))); + + subscriber.on_channel_unsubscribe(endpoint1, key); + subscriber.on_channel_unsubscribe(endpoint2, key); + + // Last subscriber that unsubscribes will stop the channel + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); + + // Last channel that unsubscribes will stop publish and return empty resource + subscriber.on_channel_unsubscribe(endpoint3, key2); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::UnsubAuto)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id2, ChannelControl::PubStop)))); + assert_eq!(subscriber.pop_output(now), Some(Output::OnResourceEmpty)); + + assert!(subscriber.is_empty()); + } + + #[test] + fn receive_data() { + let now = (); + let room = 1.into(); + let mut subscriber = DataChannelSubscriber::new(room); + let endpoint1 = 1; + let endpoint2 = 2; + let endpoint3 = 3; + let key = "test"; + let key_fake = "different_channel"; + + let channel_id = id_generator::gen_datachannel_id(room, key.to_string()); + let channel_id_fake = id_generator::gen_datachannel_id(room, key_fake.to_string()); + + subscriber.on_channel_subscribe(endpoint1, key); + subscriber.on_channel_subscribe(endpoint2, key); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(subscriber.pop_output(now), None); + + subscriber.on_channel_subscribe(endpoint3, key_fake); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::PubStart)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::SubAuto)))); + assert_eq!(subscriber.pop_output(now), None); + + let peer_id = PeerId::from("testid"); + let pkt = DataChannelPacket { + from: peer_id.clone(), + data: vec![1, 2, 3], + }; + subscriber.on_channel_data(channel_id, pkt.clone().serialize()); + let mut receivers = HashMap::new(); + receivers.insert(endpoint1, false); + receivers.insert(endpoint2, false); + receivers.insert(endpoint3, false); + + while let Some(out) = subscriber.pop_output(now) { + match out { + Output::Endpoint(endpoint, ClusterEndpointEvent::ChannelMessage(key, peer, data)) => { + assert_eq!(key, key); + assert_eq!(peer, peer_id); + assert_eq!(data, pkt.data); + assert!(receivers.contains_key(&endpoint[0])); + *receivers.get_mut(&endpoint[0]).unwrap() = true; + } + _ => panic!("Unexpected output: {:?}", out), + } + } + + // Every endpoint 1 and 2 should received the message + // Endpoint 3 should not receive anything + assert!(receivers[&endpoint1]); + assert!(receivers[&endpoint2]); + assert!(!receivers[&endpoint3]); + assert_eq!(subscriber.pop_output(now), None); + + subscriber.on_channel_unsubscribe(endpoint1, key); + subscriber.on_channel_unsubscribe(endpoint2, key); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); + subscriber.on_channel_unsubscribe(endpoint3, key_fake); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::UnsubAuto)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id_fake, ChannelControl::PubStop)))); + + assert_eq!(subscriber.pop_output(now), Some(Output::OnResourceEmpty)); + } + + #[test] + fn leave_room() { + let now = (); + let room = 1.into(); + let mut subscriber = DataChannelSubscriber::new(room); + let endpoint1 = 1; + let endpoint2 = 2; + let key = "test"; + + let channel_id = id_generator::gen_datachannel_id(room, key.to_string()); + + subscriber.on_channel_subscribe(endpoint1, key); + subscriber.on_channel_subscribe(endpoint2, key); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(subscriber.pop_output(now), None); + + subscriber.on_leave(endpoint1); + assert_eq!(subscriber.pop_output(now), None); + subscriber.on_leave(endpoint2); + + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto)))); + assert_eq!(subscriber.pop_output(now), Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop)))); + assert_eq!(subscriber.pop_output(now), Some(Output::OnResourceEmpty)); } } diff --git a/packages/media_core/src/endpoint.rs b/packages/media_core/src/endpoint.rs index c3ca9d12..e9278067 100644 --- a/packages/media_core/src/endpoint.rs +++ b/packages/media_core/src/endpoint.rs @@ -114,7 +114,7 @@ pub enum EndpointReq { LocalTrack(LocalTrackId, EndpointLocalTrackReq), SubscribeChannel(String), UnsubscribeChannel(String), - PublishChannel(String, String), + PublishChannel(String, Vec), } /// This is response, which is used to send response back to Endpoint SDK @@ -172,7 +172,7 @@ pub enum EndpointEvent { GoAway(u8, Option), /// DataChannel events - ChannelMessage(String, PeerId, String), + ChannelMessage(String, PeerId, Vec), } pub enum EndpointInput { diff --git a/packages/protocol/proto/sdk/session.proto b/packages/protocol/proto/sdk/session.proto index ac26dca6..16cc20dc 100644 --- a/packages/protocol/proto/sdk/session.proto +++ b/packages/protocol/proto/sdk/session.proto @@ -61,7 +61,7 @@ message Request { message PublishChannel { string key = 1; - string message = 2; + bytes message = 2; } oneof request { @@ -298,7 +298,7 @@ message ServerEvent { message ChannelMessage { string key = 1; string peer = 2; - string message = 3; + bytes message = 3; } oneof event { diff --git a/packages/protocol/src/datachannel.rs b/packages/protocol/src/datachannel.rs index a0e403a7..9499998b 100644 --- a/packages/protocol/src/datachannel.rs +++ b/packages/protocol/src/datachannel.rs @@ -8,7 +8,7 @@ use crate::endpoint::PeerId; pub struct DataChannelPacket { pub from: PeerId, #[derivative(Debug = "ignore")] - pub data: String, + pub data: Vec, } impl DataChannelPacket { pub fn serialize(&self) -> Vec { diff --git a/packages/protocol/src/protobuf/session.rs b/packages/protocol/src/protobuf/session.rs index 239ddc84..8d7309b6 100644 --- a/packages/protocol/src/protobuf/session.rs +++ b/packages/protocol/src/protobuf/session.rs @@ -105,8 +105,8 @@ pub mod request { pub struct PublishChannel { #[prost(string, tag = "1")] pub key: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub message: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "2")] + pub message: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] @@ -504,8 +504,8 @@ pub mod server_event { pub key: ::prost::alloc::string::String, #[prost(string, tag = "2")] pub peer: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub message: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "3")] + pub message: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] diff --git a/packages/transport_webrtc/src/transport/webrtc.rs b/packages/transport_webrtc/src/transport/webrtc.rs index c3fed48d..028f90ae 100644 --- a/packages/transport_webrtc/src/transport/webrtc.rs +++ b/packages/transport_webrtc/src/transport/webrtc.rs @@ -349,7 +349,7 @@ impl TransportWebrtcInternal for TransportWebrtcSdk { EndpointEvent::ChannelMessage(key, from, message) => { log::info!("[TransportWebrtcSdk] datachannel message {key}"); self.send_event(ProtoServerEvent::Room(ProtoRoomEvent { - event: Some(ProtoRoomEvent2::ChannelMessage(ChannelMessage { key, peer: from.0, message: message })), + event: Some(ProtoRoomEvent2::ChannelMessage(ChannelMessage { key, peer: from.0, message })), })) } EndpointEvent::GoAway(_, _) => {} @@ -833,7 +833,7 @@ impl TransportWebrtcSdk { protobuf::session::request::room::Request::PublishChannel(req) => { self.queue.push_back(InternalOutput::TransportOutput(TransportOutput::RpcReq( req_id.into(), - EndpointReq::PublishChannel(req.key, req.message.into()), + EndpointReq::PublishChannel(req.key, req.message), ))); } _ => {