diff --git a/bin/src/http/api_media.rs b/bin/src/http/api_media.rs index 66a70c41..942ff1a4 100644 --- a/bin/src/http/api_media.rs +++ b/bin/src/http/api_media.rs @@ -71,6 +71,7 @@ impl MediaApis { peer: token.peer.into(), user_agent, record: token.record, + userdata: token.userdata, }))); ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; @@ -167,6 +168,7 @@ impl MediaApis { room: token.room.into(), peer: token.peer.unwrap_or_else(|| format!("whep-{}", (random::()))).into(), user_agent, + userdata: token.userdata, }))); ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; @@ -265,7 +267,7 @@ impl MediaApis { return Err(poem::Error::from_string("Wrong peer".to_string(), StatusCode::FORBIDDEN)); } } - let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.record))); + let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.userdata, token.record))); ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; match res { @@ -334,7 +336,7 @@ impl MediaApis { } } log::info!("[MediaAPIs] restart_ice webrtc, ip {}, user_agent {}, conn {}, request {:?}", ip_addr, user_agent, conn_id.0, connect); - let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.record))); + let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.userdata, token.record))); ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; match res { diff --git a/bin/src/http/api_token.rs b/bin/src/http/api_token.rs index 3a8e8155..a1ea83e6 100644 --- a/bin/src/http/api_token.rs +++ b/bin/src/http/api_token.rs @@ -25,6 +25,7 @@ struct WhipTokenReq { peer: String, ttl: u64, record: Option, + userdata: Option, } #[derive(poem_openapi::Object)] @@ -37,6 +38,7 @@ struct WhepTokenReq { room: String, peer: Option, ttl: u64, + userdata: Option, } #[derive(poem_openapi::Object)] @@ -50,6 +52,7 @@ struct WebrtcTokenReq { peer: Option, ttl: u64, record: Option, + userdata: Option, } #[derive(poem_openapi::Object)] @@ -81,6 +84,7 @@ impl TokenApis { room: body.room, peer: body.peer, record: body.record.unwrap_or(false), + userdata: body.userdata, }, body.ttl, ), @@ -104,7 +108,15 @@ impl TokenApis { Json(Response { status: true, data: Some(WhepTokenRes { - token: ctx.secure.encode_obj("whep", WhepToken { room: body.room, peer: body.peer }, body.ttl), + token: ctx.secure.encode_obj( + "whep", + WhepToken { + room: body.room, + peer: body.peer, + userdata: body.userdata, + }, + body.ttl, + ), }), ..Default::default() }) @@ -130,6 +142,7 @@ impl TokenApis { room: body.room, peer: body.peer, record: body.record.unwrap_or(false), + userdata: body.userdata, }, body.ttl, ), diff --git a/bin/src/server/gateway/local_rpc_handler.rs b/bin/src/server/gateway/local_rpc_handler.rs index 090808da..71eb4549 100644 --- a/bin/src/server/gateway/local_rpc_handler.rs +++ b/bin/src/server/gateway/local_rpc_handler.rs @@ -123,10 +123,12 @@ impl MediaLocalRpcHandler { whep::RpcReq::Delete(param) => RpcRes::Whep(whep::RpcRes::Delete(self.whep_delete(conn_part, param).await)), }, RpcReq::Webrtc(param) => match param { - webrtc::RpcReq::Connect(session_id, ip, user_agent, param, record) => RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, record).await)), + webrtc::RpcReq::Connect(session_id, ip, user_agent, param, userdata, record) => { + RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, userdata, record).await)) + } webrtc::RpcReq::RemoteIce(conn, param) => RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(self.webrtc_remote_ice(conn_part, conn, param).await)), - webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, record) => { - RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, record).await)) + webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, userdata, record) => { + RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, userdata, record).await)) } webrtc::RpcReq::Delete(_) => { //TODO implement delete webrtc conn @@ -274,7 +276,7 @@ impl MediaLocalRpcHandler { Webrtc part */ - async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> { + async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, userdata: Option, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> { let started_at = now_ms(); self.feedback_route_begin(session_id, ip).await; @@ -287,6 +289,7 @@ impl MediaLocalRpcHandler { ip: ip.to_string(), req: Some(req), record, + userdata, }; let res = self.client.webrtc_connect(sock_addr, rpc_req).await; log::info!("[Gateway] response from node {node_id} => {:?}", res); @@ -332,6 +335,7 @@ impl MediaLocalRpcHandler { } } + #[allow(clippy::too_many_arguments)] async fn webrtc_restart_ice( &self, conn_part: Option<(NodeId, u64)>, @@ -339,6 +343,7 @@ impl MediaLocalRpcHandler { ip: IpAddr, user_agent: String, req: ConnectRequest, + userdata: Option, record: bool, ) -> RpcResult<(ClusterConnId, ConnectResponse)> { //TODO how to handle media-node down? @@ -349,6 +354,7 @@ impl MediaLocalRpcHandler { user_agent, req: Some(req), record, + userdata, }; log::info!("[Gateway] selected node {node}"); let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); diff --git a/bin/src/server/media/rpc_handler.rs b/bin/src/server/media/rpc_handler.rs index f4b110ac..735d2109 100644 --- a/bin/src/server/media/rpc_handler.rs +++ b/bin/src/server/media/rpc_handler.rs @@ -122,7 +122,14 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { /* Start of sdk */ async fn webrtc_connect(&self, ctx: &Ctx, req: WebrtcConnectRequest) -> Option { log::info!("On webrtc_connect from gateway"); - let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(req.session_id, req.ip.parse().ok()?, req.user_agent, req.req?, req.record))); + let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect( + req.session_id, + req.ip.parse().ok()?, + req.user_agent, + req.req?, + req.userdata, + req.record, + ))); ctx.req_tx.send(req).await.ok()?; let res = rx.await.ok()?; match res { @@ -152,6 +159,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { req.ip.parse().ok()?, req.user_agent, req.req?, + req.userdata, req.record, ))); ctx.req_tx.send(req).await.ok()?; diff --git a/bin/src/server/media/runtime_worker.rs b/bin/src/server/media/runtime_worker.rs index 525fca0e..428e71a4 100644 --- a/bin/src/server/media/runtime_worker.rs +++ b/bin/src/server/media/runtime_worker.rs @@ -13,6 +13,7 @@ use sans_io_runtime::{BusChannelControl, BusControl, BusEvent, WorkerInner, Work use crate::NodeConfig; +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] pub enum ExtIn { /// ext, send controller or worker, true is controller diff --git a/docs/user-guide/features/README.md b/docs/user-guide/features/README.md index 19230904..d5327669 100644 --- a/docs/user-guide/features/README.md +++ b/docs/user-guide/features/README.md @@ -9,3 +9,4 @@ In this document, we will explore the key features of atm0s-media-server. Curren | [Simulcast/SVC](./simulcast-svc.md) | Alpha | | [Recording](./recording.md) | TODO | | [Cluster](./cluster.md) | Alpha | +| [Userdata-metadata](./userdata-metadata.md) | Alpha | diff --git a/docs/user-guide/features/userdata-metadata.md b/docs/user-guide/features/userdata-metadata.md new file mode 100644 index 00000000..dd8cac8b --- /dev/null +++ b/docs/user-guide/features/userdata-metadata.md @@ -0,0 +1,6 @@ +# Userdata and metadata + +Userdata and metadata is used with same goal is providing addition data beside of only room and peer. We use 2 terms userdata and metadata with some difference: + +- Userdata: a string which is embedded to peer token, which can not be changed by client +- Metadata: a string which is embedded to peer or track by client. In there peer metadata is set at join-room step, track metadata is set at publish track step. diff --git a/packages/media_core/src/cluster.rs b/packages/media_core/src/cluster.rs index f38283dc..2013a72f 100644 --- a/packages/media_core/src/cluster.rs +++ b/packages/media_core/src/cluster.rs @@ -201,7 +201,7 @@ mod tests { let room_peers_map = id_generator::peers_map(userdata.0); let peer = PeerId("peer1".to_string()); let peer_key = id_generator::peers_key(&peer); - let peer_info = PeerInfo::new(peer.clone(), PeerMeta { metadata: None }); + let peer_info = PeerInfo::new(peer.clone(), PeerMeta { metadata: None, userdata: None }); let now = Instant::now(); // Not join room with scope (peer true, track false) should Set and Sub diff --git a/packages/media_core/src/cluster/room.rs b/packages/media_core/src/cluster/room.rs index 644c23ca..12cff3d7 100644 --- a/packages/media_core/src/cluster/room.rs +++ b/packages/media_core/src/cluster/room.rs @@ -270,7 +270,7 @@ mod tests { endpoint, ClusterEndpointControl::Join( peer.clone(), - PeerMeta { metadata: None }, + PeerMeta { metadata: None, userdata: None }, RoomInfoPublish { peer: false, tracks: false }, RoomInfoSubscribe { peers: true, tracks: true }, Some(AudioMixerConfig { diff --git a/packages/media_core/src/cluster/room/metadata.rs b/packages/media_core/src/cluster/room/metadata.rs index dd7a4895..7ed596f2 100644 --- a/packages/media_core/src/cluster/room/metadata.rs +++ b/packages/media_core/src/cluster/room/metadata.rs @@ -381,7 +381,7 @@ mod tests { let room: ClusterRoomHash = 1.into(); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; let endpoint = 1; room_meta.on_join( endpoint, @@ -408,7 +408,7 @@ mod tests { let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); let peer_key = id_generator::peers_key(&peer_id); let endpoint = 1; @@ -464,14 +464,14 @@ mod tests { let peer2: PeerId = "peer2".to_string().into(); let peer2_key = id_generator::peers_key(&peer2); - let peer2_info = PeerInfo::new(peer2, PeerMeta { metadata: None }); + let peer2_info = PeerInfo::new(peer2, PeerMeta { metadata: None, userdata: None }); room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer2_key, 0, peer2_info.serialize())); assert_eq!(room_meta.pop_output(()), None); let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -500,7 +500,7 @@ mod tests { let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); let peer_key = id_generator::peers_key(&peer_id); let endpoint = 1; @@ -569,7 +569,7 @@ mod tests { let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -601,7 +601,7 @@ mod tests { let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); let peer_key = id_generator::peers_key(&peer_id); let endpoint = 1; @@ -643,7 +643,7 @@ mod tests { let room: ClusterRoomHash = 1.into(); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; let endpoint = 1; room_meta.on_join( endpoint, @@ -702,7 +702,7 @@ mod tests { let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -748,7 +748,7 @@ mod tests { let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -783,7 +783,7 @@ mod tests { let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -823,7 +823,7 @@ mod tests { let room: ClusterRoomHash = 1.into(); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None }; + let peer_meta = PeerMeta { metadata: None, userdata: None }; let endpoint = 1; room_meta.on_join( endpoint, diff --git a/packages/media_core/src/endpoint/internal.rs b/packages/media_core/src/endpoint/internal.rs index ffdcf639..91fc87d8 100644 --- a/packages/media_core/src/endpoint/internal.rs +++ b/packages/media_core/src/endpoint/internal.rs @@ -524,7 +524,7 @@ mod tests { let room: RoomId = "room".into(); let peer: PeerId = "peer".into(); - let meta = PeerMeta { metadata: None }; + let meta = PeerMeta { metadata: None, userdata: None }; let publish = RoomInfoPublish { peer: true, tracks: true }; let subscribe = RoomInfoSubscribe { peers: true, tracks: true }; internal.on_transport_rpc(now, 0.into(), EndpointReq::JoinRoom(room.clone(), peer.clone(), meta.clone(), publish.clone(), subscribe.clone(), None)); @@ -595,7 +595,7 @@ mod tests { let room1: RoomId = "room1".into(); let room1_hash = ClusterRoomHash::from(&room1); let peer: PeerId = "peer".into(); - let meta = PeerMeta { metadata: None }; + let meta = PeerMeta { metadata: None, userdata: None }; let publish = RoomInfoPublish { peer: true, tracks: true }; let subscribe = RoomInfoSubscribe { peers: true, tracks: true }; internal.on_transport_rpc( diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index 81e571a3..20bddfef 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -457,7 +457,7 @@ impl MediaServerWorker { match self .media_webrtc .input(&mut self.switcher) - .spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whip(req.room, req.peer, req.record), &req.sdp) + .spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whip(req.room, req.peer, req.userdata, req.record), &req.sdp) { Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Ok(WhipConnectRes { conn_id, sdp }))))), Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Err(e))))), @@ -482,7 +482,7 @@ impl MediaServerWorker { match self .media_webrtc .input(&mut self.switcher) - .spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whep(req.room, peer_id.into()), &req.sdp) + .spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whep(req.room, peer_id.into(), req.userdata), &req.sdp) { Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Ok(WhepConnectRes { conn_id, sdp }))))), Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Err(e))))), @@ -502,11 +502,11 @@ impl MediaServerWorker { } }, RpcReq::Webrtc(req) => match req { - webrtc::RpcReq::Connect(session_id, ip, user_agent, req, record) => { + webrtc::RpcReq::Connect(session_id, ip, user_agent, req, userdata, record) => { match self .media_webrtc .input(&mut self.switcher) - .spawn(ip, session_id, VariantParams::Webrtc(user_agent, req.clone(), record, self.secure.clone()), &req.sdp) + .spawn(ip, session_id, VariantParams::Webrtc(user_agent, req.clone(), userdata, record, self.secure.clone()), &req.sdp) { Ok((ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc( req_id, @@ -529,11 +529,14 @@ impl MediaServerWorker { GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::RemoteIce(req_id, transport_webrtc::Variant::Webrtc, ice.candidates)), ); } - webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, record) => { + webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, userdata, record) => { log::info!("on rpc request {req_id}, webrtc::RpcReq::RestartIce"); self.media_webrtc.input(&mut self.switcher).on_event( now, - GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::RestartIce(req_id, transport_webrtc::Variant::Webrtc, ip, user_agent, req, record)), + GroupInput::Ext( + conn.into(), + transport_webrtc::ExtIn::RestartIce(req_id, transport_webrtc::Variant::Webrtc, ip, user_agent, req, userdata, record), + ), ); } webrtc::RpcReq::Delete(_) => todo!(), diff --git a/packages/protocol/proto/cluster/gateway.proto b/packages/protocol/proto/cluster/gateway.proto index ee2437c0..22648645 100644 --- a/packages/protocol/proto/cluster/gateway.proto +++ b/packages/protocol/proto/cluster/gateway.proto @@ -68,6 +68,7 @@ message WhipConnectRequest { string peer = 5; uint64 session_id = 6; bool record = 7; + optional string userdata = 8; } message WhipConnectResponse { @@ -100,6 +101,7 @@ message WhepConnectRequest { string room = 4; string peer = 5; uint64 session_id = 6; + optional string userdata = 8; } message WhepConnectResponse { @@ -131,6 +133,7 @@ message WebrtcConnectRequest { gateway.ConnectRequest req = 3; uint64 session_id = 4; bool record = 5; + optional string userdata = 8; } message WebrtcConnectResponse { @@ -152,6 +155,7 @@ message WebrtcRestartIceRequest { string ip = 3; gateway.ConnectRequest req = 4; bool record = 5; + optional string userdata = 8; } message WebrtcRestartIceResponse { diff --git a/packages/protocol/proto/sdk/session.proto b/packages/protocol/proto/sdk/session.proto index 79ef2942..ce54c774 100644 --- a/packages/protocol/proto/sdk/session.proto +++ b/packages/protocol/proto/sdk/session.proto @@ -233,6 +233,7 @@ message ServerEvent { message PeerJoined { string peer = 1; optional string metadata = 2; + optional string userdata = 3; } message PeerUpdated { diff --git a/packages/protocol/src/endpoint.rs b/packages/protocol/src/endpoint.rs index 69767435..d8c6b748 100644 --- a/packages/protocol/src/endpoint.rs +++ b/packages/protocol/src/endpoint.rs @@ -222,6 +222,7 @@ pub struct PeerHashCode(pub u64); #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PeerMeta { pub metadata: Option, + pub userdata: Option, //userdata is fixed data extracted from token } /// diff --git a/packages/protocol/src/protobuf/cluster_gateway.rs b/packages/protocol/src/protobuf/cluster_gateway.rs index 544a7349..6ba3b7eb 100644 --- a/packages/protocol/src/protobuf/cluster_gateway.rs +++ b/packages/protocol/src/protobuf/cluster_gateway.rs @@ -92,6 +92,8 @@ pub struct WhipConnectRequest { pub session_id: u64, #[prost(bool, tag = "7")] pub record: bool, + #[prost(string, optional, tag = "8")] + pub userdata: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -143,6 +145,8 @@ pub struct WhepConnectRequest { pub peer: ::prost::alloc::string::String, #[prost(uint64, tag = "6")] pub session_id: u64, + #[prost(string, optional, tag = "8")] + pub userdata: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -192,6 +196,8 @@ pub struct WebrtcConnectRequest { pub session_id: u64, #[prost(bool, tag = "5")] pub record: bool, + #[prost(string, optional, tag = "8")] + pub userdata: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -226,6 +232,8 @@ pub struct WebrtcRestartIceRequest { pub req: ::core::option::Option, #[prost(bool, tag = "5")] pub record: bool, + #[prost(string, optional, tag = "8")] + pub userdata: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/packages/protocol/src/protobuf/session.rs b/packages/protocol/src/protobuf/session.rs index 67de64ad..829285f7 100644 --- a/packages/protocol/src/protobuf/session.rs +++ b/packages/protocol/src/protobuf/session.rs @@ -407,6 +407,8 @@ pub mod server_event { pub peer: ::prost::alloc::string::String, #[prost(string, optional, tag = "2")] pub metadata: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "3")] + pub userdata: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/packages/protocol/src/tokens.rs b/packages/protocol/src/tokens.rs index afd5a6e2..c658a5ce 100644 --- a/packages/protocol/src/tokens.rs +++ b/packages/protocol/src/tokens.rs @@ -5,12 +5,14 @@ pub struct WhipToken { pub room: String, pub peer: String, pub record: bool, + pub userdata: Option, } #[derive(Serialize, Deserialize, Debug)] pub struct WhepToken { pub room: String, pub peer: Option, + pub userdata: Option, } #[derive(Serialize, Deserialize, Debug)] @@ -18,4 +20,5 @@ pub struct WebrtcToken { pub room: Option, pub peer: Option, pub record: bool, + pub userdata: Option, } diff --git a/packages/protocol/src/transport/webrtc.rs b/packages/protocol/src/transport/webrtc.rs index d7f8b75a..f21c4bdb 100644 --- a/packages/protocol/src/transport/webrtc.rs +++ b/packages/protocol/src/transport/webrtc.rs @@ -5,25 +5,25 @@ use crate::protobuf::gateway::{ConnectRequest, ConnectResponse, RemoteIceRequest #[derive(Debug, Clone)] pub enum RpcReq { - /// Ip, Agent, Req, Record - Connect(u64, IpAddr, String, ConnectRequest, bool), + /// Ip, Agent, Req, Userdata, Record + Connect(u64, IpAddr, String, ConnectRequest, Option, bool), RemoteIce(Conn, RemoteIceRequest), - /// ConnId, Ip, Agent, Req, Record - RestartIce(Conn, IpAddr, String, ConnectRequest, bool), + /// ConnId, Ip, Agent, Req, Userdata, Record + RestartIce(Conn, IpAddr, String, ConnectRequest, Option, bool), Delete(Conn), } impl RpcReq { pub fn down(self) -> (RpcReq, Option) { match self { - RpcReq::Connect(session_id, ip_addr, user_agent, req, record) => (RpcReq::Connect(session_id, ip_addr, user_agent, req, record), None), + RpcReq::Connect(session_id, ip_addr, user_agent, req, userdata, record) => (RpcReq::Connect(session_id, ip_addr, user_agent, req, userdata, record), None), RpcReq::RemoteIce(conn, req) => { let (down, layer) = conn.down(); (RpcReq::RemoteIce(down, req), Some(layer)) } - RpcReq::RestartIce(conn, ip_addr, user_agent, req, record) => { + RpcReq::RestartIce(conn, ip_addr, user_agent, req, userdata, record) => { let (down, layer) = conn.down(); - (RpcReq::RestartIce(down, ip_addr, user_agent, req, record), Some(layer)) + (RpcReq::RestartIce(down, ip_addr, user_agent, req, userdata, record), Some(layer)) } RpcReq::Delete(conn) => { let (down, layer) = conn.down(); diff --git a/packages/protocol/src/transport/whep.rs b/packages/protocol/src/transport/whep.rs index 3660e2c9..c81ac181 100644 --- a/packages/protocol/src/transport/whep.rs +++ b/packages/protocol/src/transport/whep.rs @@ -15,6 +15,7 @@ pub struct WhepConnectReq { pub peer: PeerId, pub ip: IpAddr, pub user_agent: String, + pub userdata: Option, } #[derive(Debug, Clone)] @@ -102,6 +103,7 @@ impl TryFrom for WhepConnectReq { peer: value.peer.into(), ip: value.ip.parse().map_err(|_| ())?, user_agent: value.user_agent, + userdata: value.userdata, }) } } @@ -115,6 +117,7 @@ impl From for protobuf::cluster_gateway::WhepConnectRequest { sdp: val.sdp, room: val.room.0, peer: val.peer.0, + userdata: val.userdata, } } } diff --git a/packages/protocol/src/transport/whip.rs b/packages/protocol/src/transport/whip.rs index a7f3a12a..643de3e2 100644 --- a/packages/protocol/src/transport/whip.rs +++ b/packages/protocol/src/transport/whip.rs @@ -16,6 +16,7 @@ pub struct WhipConnectReq { pub record: bool, pub ip: IpAddr, pub user_agent: String, + pub userdata: Option, } #[derive(Debug, Clone)] @@ -104,6 +105,7 @@ impl TryFrom for WhipConnectReq { record: value.record, ip: value.ip.parse().map_err(|_| ())?, user_agent: value.user_agent, + userdata: value.userdata, }) } } @@ -118,6 +120,7 @@ impl From for protobuf::cluster_gateway::WhipConnectRequest { room: val.room.0, peer: val.peer.0, record: val.record, + userdata: val.userdata, } } } diff --git a/packages/transport_webrtc/src/transport.rs b/packages/transport_webrtc/src/transport.rs index a415b03a..794e2ee7 100644 --- a/packages/transport_webrtc/src/transport.rs +++ b/packages/transport_webrtc/src/transport.rs @@ -44,9 +44,9 @@ mod whip; #[allow(clippy::large_enum_variant)] pub enum VariantParams { - Whip(RoomId, PeerId, bool), - Whep(RoomId, PeerId), - Webrtc(String, ConnectRequest, bool, Arc), + Whip(RoomId, PeerId, Option, bool), + Whep(RoomId, PeerId, Option), + Webrtc(String, ConnectRequest, Option, bool, Arc), } #[derive(Debug, PartialEq, Eq)] @@ -59,8 +59,8 @@ pub enum Variant { #[allow(clippy::large_enum_variant)] pub enum ExtIn { RemoteIce(u64, Variant, Vec), - ///Last bool is record flag - RestartIce(u64, Variant, IpAddr, String, ConnectRequest, bool), + /// Last option, bool is userdata and record flag + RestartIce(u64, Variant, IpAddr, String, ConnectRequest, Option, bool), } #[derive(Debug, PartialEq, Eq)] @@ -144,9 +144,9 @@ impl TransportWebrtc { let mut rtc = rtc_config.build(); let mut internal: Box = match variant { - VariantParams::Whip(room, peer, _record) => Box::new(whip::TransportWebrtcWhip::new(room, peer, remote)), - VariantParams::Whep(room, peer) => Box::new(whep::TransportWebrtcWhep::new(room, peer, remote)), - VariantParams::Webrtc(_user_agent, req, _record, secure) => { + VariantParams::Whip(room, peer, userdata, _record) => Box::new(whip::TransportWebrtcWhip::new(room, peer, userdata, remote)), + VariantParams::Whep(room, peer, userdata) => Box::new(whep::TransportWebrtcWhep::new(room, peer, userdata, remote)), + VariantParams::Webrtc(_user_agent, req, userdata, _record, secure) => { rtc.direct_api().create_data_channel(ChannelConfig { label: "data".to_string(), negotiated: Some(1000), @@ -155,7 +155,7 @@ impl TransportWebrtc { //we need to start sctp as client side for handling restart-ice in new server //if not, datachannel will not connect successful after reconnect to new server rtc.direct_api().start_sctp(true); - Box::new(webrtc::TransportWebrtcSdk::new(req, secure, remote)) + Box::new(webrtc::TransportWebrtcSdk::new(req, userdata, secure, remote)) } }; @@ -308,7 +308,7 @@ impl Transport for TransportWebrtc } self.queue.push_back(TransportOutput::Ext(ExtOut::RemoteIce(req_id, variant, Ok(success_count)))); } - ExtIn::RestartIce(req_id, variant, _ip, _useragent, req, _record) => { + ExtIn::RestartIce(req_id, variant, _ip, _useragent, req, _userdata, _record) => { if let Ok(offer) = SdpOffer::from_sdp_string(&req.sdp) { if let Ok(answer) = self.rtc.sdp_api().accept_offer(offer) { self.internal.on_codec_config(self.rtc.codec_config()); diff --git a/packages/transport_webrtc/src/transport/webrtc.rs b/packages/transport_webrtc/src/transport/webrtc.rs index c3876ebc..b91ab53d 100644 --- a/packages/transport_webrtc/src/transport/webrtc.rs +++ b/packages/transport_webrtc/src/transport/webrtc.rs @@ -74,6 +74,7 @@ enum TransportWebrtcError { pub struct TransportWebrtcSdk { remote: IpAddr, + userdata: Option, join: Option<(RoomId, PeerId, Option, RoomInfoPublish, RoomInfoSubscribe)>, state: State, queue: DynamicDeque, @@ -88,13 +89,14 @@ pub struct TransportWebrtcSdk { } impl TransportWebrtcSdk { - pub fn new(req: ConnectRequest, secure: Arc, remote: IpAddr) -> Self { + pub fn new(req: ConnectRequest, userdata: Option, secure: Arc, remote: IpAddr) -> Self { let tracks = req.tracks.unwrap_or_default(); let local_tracks: Vec = tracks.receivers.into_iter().enumerate().map(|(index, r)| LocalTrack::new((index as u16).into(), r)).collect(); let remote_tracks: Vec = tracks.senders.into_iter().enumerate().map(|(index, s)| RemoteTrack::new((index as u16).into(), s)).collect(); if let Some(j) = req.join { Self { remote, + userdata, join: Some((j.room.into(), j.peer.into(), j.metadata, j.publish.unwrap_or_default().into(), j.subscribe.unwrap_or_default().into())), state: State::New, audio_mixer: j.features.and_then(|f| { @@ -120,6 +122,7 @@ impl TransportWebrtcSdk { } else { Self { remote, + userdata, join: None, state: State::New, local_tracks, @@ -241,6 +244,7 @@ impl TransportWebrtcInternal for TransportWebrtcSdk { event: Some(ProtoRoomEvent2::PeerJoined(PeerJoined { peer: peer.0, metadata: meta.metadata, + userdata: meta.userdata, })), })); } @@ -430,7 +434,10 @@ impl TransportWebrtcInternal for TransportWebrtcSdk { EndpointReq::JoinRoom( room.clone(), peer.clone(), - PeerMeta { metadata: metadata.clone() }, + PeerMeta { + metadata: metadata.clone(), + userdata: self.userdata.clone(), + }, publish.clone(), subscribe.clone(), self.audio_mixer.take(), @@ -633,7 +640,10 @@ impl TransportWebrtcSdk { match req { protobuf::session::request::session::Request::Join(req) => { let info = req.info.unwrap_or_default(); - let meta = PeerMeta { metadata: info.metadata }; + let meta = PeerMeta { + metadata: info.metadata, + userdata: self.userdata.clone(), + }; if let Some(token) = self.secure.decode_obj::("webrtc", &req.token) { if token.room == Some(info.room.clone()) && token.peer == Some(info.peer.clone()) { let mixer_cfg = info.features.and_then(|f| { @@ -838,7 +848,7 @@ mod tests { let now = Instant::now(); let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let secure_jwt = Arc::new(MediaEdgeSecureJwt::from(b"1234".as_slice())); - let mut transport = TransportWebrtcSdk::new(req, secure_jwt.clone(), ip); + let mut transport = TransportWebrtcSdk::new(req, Some("userdata".to_string()), secure_jwt.clone(), ip); assert_eq!(transport.pop_output(now), None); transport.on_tick(now); @@ -860,7 +870,8 @@ mod tests { "room".to_string().into(), "peer".to_string().into(), PeerMeta { - metadata: Some("metadata".to_string()) + metadata: Some("metadata".to_string()), + userdata: Some("userdata".to_string()) }, RoomInfoPublish { peer: true, tracks: true }, RoomInfoSubscribe { peers: true, tracks: true }, @@ -881,7 +892,7 @@ mod tests { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let gateway_jwt = MediaGatewaySecureJwt::from(b"1234".as_slice()); let secure_jwt = Arc::new(MediaEdgeSecureJwt::from(b"1234".as_slice())); - let mut transport = TransportWebrtcSdk::new(req, secure_jwt.clone(), ip); + let mut transport = TransportWebrtcSdk::new(req, Some("userdata".to_string()), secure_jwt.clone(), ip); assert_eq!(transport.pop_output(now), None); transport.on_tick(now); @@ -903,6 +914,7 @@ mod tests { room: Some("demo".to_string()), peer: Some("peer1".to_string()), record: false, + userdata: Some("userdata".to_string()), }, 10000, ); @@ -933,7 +945,10 @@ mod tests { EndpointReq::JoinRoom( "demo".to_string().into(), "peer1".to_string().into(), - PeerMeta { metadata: None }, + PeerMeta { + metadata: None, + userdata: Some("userdata".to_string()) + }, RoomInfoPublish { peer: false, tracks: false }, RoomInfoSubscribe { peers: false, tracks: false }, None, diff --git a/packages/transport_webrtc/src/transport/whep.rs b/packages/transport_webrtc/src/transport/whep.rs index cab826dd..4b7cb1bf 100644 --- a/packages/transport_webrtc/src/transport/whep.rs +++ b/packages/transport_webrtc/src/transport/whep.rs @@ -52,6 +52,7 @@ pub struct TransportWebrtcWhep { remote: IpAddr, room: RoomId, peer: PeerId, + userdata: Option, state: State, audio_mid: Option, video_mid: Option, @@ -63,11 +64,12 @@ pub struct TransportWebrtcWhep { } impl TransportWebrtcWhep { - pub fn new(room: RoomId, peer: PeerId, remote: IpAddr) -> Self { + pub fn new(room: RoomId, peer: PeerId, userdata: Option, remote: IpAddr) -> Self { Self { remote, room, peer, + userdata, state: State::New, audio_mid: None, video_mid: None, @@ -174,7 +176,10 @@ impl TransportWebrtcInternal for TransportWebrtcWhep { EndpointReq::JoinRoom( self.room.clone(), self.peer.clone(), - PeerMeta { metadata: None }, + PeerMeta { + metadata: None, + userdata: self.userdata.clone(), + }, RoomInfoPublish { peer: false, tracks: false }, RoomInfoSubscribe { peers: false, tracks: true }, None, diff --git a/packages/transport_webrtc/src/transport/whip.rs b/packages/transport_webrtc/src/transport/whip.rs index fd88c816..48183dc3 100644 --- a/packages/transport_webrtc/src/transport/whip.rs +++ b/packages/transport_webrtc/src/transport/whip.rs @@ -48,6 +48,7 @@ pub struct TransportWebrtcWhip { remote: IpAddr, room: RoomId, peer: PeerId, + userdata: Option, state: State, audio_mid: Option, ///mid and simulcast flag @@ -57,11 +58,12 @@ pub struct TransportWebrtcWhip { } impl TransportWebrtcWhip { - pub fn new(room: RoomId, peer: PeerId, remote: IpAddr) -> Self { + pub fn new(room: RoomId, peer: PeerId, userdata: Option, remote: IpAddr) -> Self { Self { remote, room, peer, + userdata, state: State::New, audio_mid: None, video_mid: None, @@ -153,7 +155,10 @@ impl TransportWebrtcInternal for TransportWebrtcWhip { EndpointReq::JoinRoom( self.room.clone(), self.peer.clone(), - PeerMeta { metadata: None }, + PeerMeta { + metadata: None, + userdata: self.userdata.clone(), + }, RoomInfoPublish { peer: true, tracks: true }, RoomInfoSubscribe { peers: false, tracks: false }, None, diff --git a/packages/transport_webrtc/src/worker.rs b/packages/transport_webrtc/src/worker.rs index bbc2a1a9..56795d25 100644 --- a/packages/transport_webrtc/src/worker.rs +++ b/packages/transport_webrtc/src/worker.rs @@ -30,6 +30,7 @@ use crate::{ group_owner_type!(WebrtcSession); +#[allow(clippy::large_enum_variant)] pub enum GroupInput { Net(BackendIncoming), Cluster(WebrtcSession, ClusterEndpointEvent), @@ -76,17 +77,17 @@ impl MediaWorkerWebrtc { pub fn spawn(&mut self, remote: IpAddr, session_id: u64, variant: VariantParams, offer: &str) -> RpcResult<(bool, String, usize)> { let cfg = match &variant { - VariantParams::Whip(_, _, record) => EndpointCfg { + VariantParams::Whip(_, _, _, record) => EndpointCfg { max_ingress_bitrate: 2_500_000, max_egress_bitrate: 2_500_000, record: *record, }, - VariantParams::Whep(_, _) => EndpointCfg { + VariantParams::Whep(_, _, _) => EndpointCfg { max_ingress_bitrate: 2_500_000, max_egress_bitrate: 2_500_000, record: false, }, - VariantParams::Webrtc(_, _, record, _) => EndpointCfg { + VariantParams::Webrtc(_, _, _, record, _) => EndpointCfg { max_ingress_bitrate: 2_500_000, max_egress_bitrate: 2_500_000, record: *record, @@ -154,10 +155,10 @@ impl MediaWorkerWebrtc { self.queue .push_back(GroupOutput::Ext(owner, ExtOut::RemoteIce(req_id, variant, Err(RpcError::new2(WebrtcError::RpcEndpointNotFound))))); } - ExtIn::RestartIce(req_id, variant, remote, useragent, req, record) => { + ExtIn::RestartIce(req_id, variant, remote, useragent, req, userdata, record) => { let sdp = req.sdp.clone(); let session_id = gen_cluster_session_id(); //TODO need to reuse old session_id - if let Ok((ice_lite, sdp, index)) = self.spawn(remote, session_id, VariantParams::Webrtc(useragent, req, record, self.secure.clone()), &sdp) { + if let Ok((ice_lite, sdp, index)) = self.spawn(remote, session_id, VariantParams::Webrtc(useragent, req, userdata, record, self.secure.clone()), &sdp) { self.queue.push_back(GroupOutput::Ext(index.into(), ExtOut::RestartIce(req_id, variant, Ok((ice_lite, sdp))))); } else { self.queue