From b63e5cfba049b6421cdd576da5e0fb802c012d49 Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Fri, 26 Jul 2024 14:06:54 +0700 Subject: [PATCH] refactor: ename peer's userdata to extra_data for avoid miss-understand (#386) --- bin/src/http/api_media.rs | 8 +++---- bin/src/http/api_token.rs | 12 +++++----- bin/src/server/gateway/local_rpc_handler.rs | 16 ++++++------- bin/src/server/media/rpc_handler.rs | 4 ++-- docs/user-guide/features/README.md | 2 +- .../features/extra_data-metadata.md | 6 +++++ docs/user-guide/features/userdata-metadata.md | 6 ----- packages/media_core/src/cluster.rs | 2 +- packages/media_core/src/cluster/room.rs | 2 +- .../media_core/src/cluster/room/metadata.rs | 24 +++++++++---------- packages/media_core/src/endpoint/internal.rs | 4 ++-- packages/media_runner/src/worker.rs | 12 +++++----- packages/protocol/proto/cluster/gateway.proto | 8 +++---- packages/protocol/proto/sdk/session.proto | 2 +- packages/protocol/src/endpoint.rs | 2 +- .../protocol/src/protobuf/cluster_gateway.rs | 8 +++---- packages/protocol/src/protobuf/session.rs | 2 +- packages/protocol/src/tokens.rs | 6 ++--- packages/protocol/src/transport/webrtc.rs | 6 ++--- packages/protocol/src/transport/whep.rs | 6 ++--- packages/protocol/src/transport/whip.rs | 6 ++--- packages/transport_webrtc/src/transport.rs | 12 +++++----- .../transport_webrtc/src/transport/webrtc.rs | 24 +++++++++---------- .../transport_webrtc/src/transport/whep.rs | 8 +++---- .../transport_webrtc/src/transport/whip.rs | 8 +++---- packages/transport_webrtc/src/worker.rs | 4 ++-- 26 files changed, 100 insertions(+), 100 deletions(-) create mode 100644 docs/user-guide/features/extra_data-metadata.md delete mode 100644 docs/user-guide/features/userdata-metadata.md diff --git a/bin/src/http/api_media.rs b/bin/src/http/api_media.rs index 942ff1a4..0ed76e6a 100644 --- a/bin/src/http/api_media.rs +++ b/bin/src/http/api_media.rs @@ -71,7 +71,7 @@ impl MediaApis { peer: token.peer.into(), user_agent, record: token.record, - userdata: token.userdata, + extra_data: token.extra_data, }))); ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; @@ -168,7 +168,7 @@ impl MediaApis { room: token.room.into(), peer: token.peer.unwrap_or_else(|| format!("whep-{}", (random::()))).into(), user_agent, - userdata: token.userdata, + extra_data: token.extra_data, }))); ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; @@ -267,7 +267,7 @@ impl MediaApis { return Err(poem::Error::from_string("Wrong peer".to_string(), StatusCode::FORBIDDEN)); } } - let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.userdata, token.record))); + let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.extra_data, token.record))); ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; match res { @@ -336,7 +336,7 @@ impl MediaApis { } } log::info!("[MediaAPIs] restart_ice webrtc, ip {}, user_agent {}, conn {}, request {:?}", ip_addr, user_agent, conn_id.0, connect); - let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.userdata, token.record))); + let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.extra_data, token.record))); ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; match res { diff --git a/bin/src/http/api_token.rs b/bin/src/http/api_token.rs index a1ea83e6..1b52e042 100644 --- a/bin/src/http/api_token.rs +++ b/bin/src/http/api_token.rs @@ -25,7 +25,7 @@ struct WhipTokenReq { peer: String, ttl: u64, record: Option, - userdata: Option, + extra_data: Option, } #[derive(poem_openapi::Object)] @@ -38,7 +38,7 @@ struct WhepTokenReq { room: String, peer: Option, ttl: u64, - userdata: Option, + extra_data: Option, } #[derive(poem_openapi::Object)] @@ -52,7 +52,7 @@ struct WebrtcTokenReq { peer: Option, ttl: u64, record: Option, - userdata: Option, + extra_data: Option, } #[derive(poem_openapi::Object)] @@ -84,7 +84,7 @@ impl TokenApis { room: body.room, peer: body.peer, record: body.record.unwrap_or(false), - userdata: body.userdata, + extra_data: body.extra_data, }, body.ttl, ), @@ -113,7 +113,7 @@ impl TokenApis { WhepToken { room: body.room, peer: body.peer, - userdata: body.userdata, + extra_data: body.extra_data, }, body.ttl, ), @@ -142,7 +142,7 @@ impl TokenApis { room: body.room, peer: body.peer, record: body.record.unwrap_or(false), - userdata: body.userdata, + extra_data: body.extra_data, }, body.ttl, ), diff --git a/bin/src/server/gateway/local_rpc_handler.rs b/bin/src/server/gateway/local_rpc_handler.rs index 71eb4549..cab423aa 100644 --- a/bin/src/server/gateway/local_rpc_handler.rs +++ b/bin/src/server/gateway/local_rpc_handler.rs @@ -123,12 +123,12 @@ impl MediaLocalRpcHandler { whep::RpcReq::Delete(param) => RpcRes::Whep(whep::RpcRes::Delete(self.whep_delete(conn_part, param).await)), }, RpcReq::Webrtc(param) => match param { - webrtc::RpcReq::Connect(session_id, ip, user_agent, param, userdata, record) => { - RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, userdata, record).await)) + webrtc::RpcReq::Connect(session_id, ip, user_agent, param, extra_data, record) => { + RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, extra_data, record).await)) } webrtc::RpcReq::RemoteIce(conn, param) => RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(self.webrtc_remote_ice(conn_part, conn, param).await)), - webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, userdata, record) => { - RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, userdata, record).await)) + webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, extra_data, record) => { + RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, extra_data, record).await)) } webrtc::RpcReq::Delete(_) => { //TODO implement delete webrtc conn @@ -276,7 +276,7 @@ impl MediaLocalRpcHandler { Webrtc part */ - async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, userdata: Option, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> { + async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, extra_data: Option, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> { let started_at = now_ms(); self.feedback_route_begin(session_id, ip).await; @@ -289,7 +289,7 @@ impl MediaLocalRpcHandler { ip: ip.to_string(), req: Some(req), record, - userdata, + extra_data, }; let res = self.client.webrtc_connect(sock_addr, rpc_req).await; log::info!("[Gateway] response from node {node_id} => {:?}", res); @@ -343,7 +343,7 @@ impl MediaLocalRpcHandler { ip: IpAddr, user_agent: String, req: ConnectRequest, - userdata: Option, + extra_data: Option, record: bool, ) -> RpcResult<(ClusterConnId, ConnectResponse)> { //TODO how to handle media-node down? @@ -354,7 +354,7 @@ impl MediaLocalRpcHandler { user_agent, req: Some(req), record, - userdata, + extra_data, }; log::info!("[Gateway] selected node {node}"); let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); diff --git a/bin/src/server/media/rpc_handler.rs b/bin/src/server/media/rpc_handler.rs index 735d2109..f29cf8a1 100644 --- a/bin/src/server/media/rpc_handler.rs +++ b/bin/src/server/media/rpc_handler.rs @@ -127,7 +127,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { req.ip.parse().ok()?, req.user_agent, req.req?, - req.userdata, + req.extra_data, req.record, ))); ctx.req_tx.send(req).await.ok()?; @@ -159,7 +159,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { req.ip.parse().ok()?, req.user_agent, req.req?, - req.userdata, + req.extra_data, req.record, ))); ctx.req_tx.send(req).await.ok()?; diff --git a/docs/user-guide/features/README.md b/docs/user-guide/features/README.md index d5327669..20f051b1 100644 --- a/docs/user-guide/features/README.md +++ b/docs/user-guide/features/README.md @@ -9,4 +9,4 @@ In this document, we will explore the key features of atm0s-media-server. Curren | [Simulcast/SVC](./simulcast-svc.md) | Alpha | | [Recording](./recording.md) | TODO | | [Cluster](./cluster.md) | Alpha | -| [Userdata-metadata](./userdata-metadata.md) | Alpha | +| [extra_data-metadata](./extra_data-metadata.md) | Alpha | diff --git a/docs/user-guide/features/extra_data-metadata.md b/docs/user-guide/features/extra_data-metadata.md new file mode 100644 index 00000000..0ef9ce07 --- /dev/null +++ b/docs/user-guide/features/extra_data-metadata.md @@ -0,0 +1,6 @@ +# extra_data and metadata + +In atm0s-media, extra_data and metadata is used with same goal is providing addition data beside of only room and peer. We use 2 terms extra_data and metadata with some difference: + +- extra_data: a string which is embedded to peer token, which can not be changed by client +- metadata: a string which is embedded to peer or track by client. In there peer metadata is set at join-room step, track metadata is set at publish track step. diff --git a/docs/user-guide/features/userdata-metadata.md b/docs/user-guide/features/userdata-metadata.md deleted file mode 100644 index dd8cac8b..00000000 --- a/docs/user-guide/features/userdata-metadata.md +++ /dev/null @@ -1,6 +0,0 @@ -# Userdata and metadata - -Userdata and metadata is used with same goal is providing addition data beside of only room and peer. We use 2 terms userdata and metadata with some difference: - -- Userdata: a string which is embedded to peer token, which can not be changed by client -- Metadata: a string which is embedded to peer or track by client. In there peer metadata is set at join-room step, track metadata is set at publish track step. diff --git a/packages/media_core/src/cluster.rs b/packages/media_core/src/cluster.rs index 2013a72f..28f2fca6 100644 --- a/packages/media_core/src/cluster.rs +++ b/packages/media_core/src/cluster.rs @@ -201,7 +201,7 @@ mod tests { let room_peers_map = id_generator::peers_map(userdata.0); let peer = PeerId("peer1".to_string()); let peer_key = id_generator::peers_key(&peer); - let peer_info = PeerInfo::new(peer.clone(), PeerMeta { metadata: None, userdata: None }); + let peer_info = PeerInfo::new(peer.clone(), PeerMeta { metadata: None, extra_data: None }); let now = Instant::now(); // Not join room with scope (peer true, track false) should Set and Sub diff --git a/packages/media_core/src/cluster/room.rs b/packages/media_core/src/cluster/room.rs index 12cff3d7..345c7ce5 100644 --- a/packages/media_core/src/cluster/room.rs +++ b/packages/media_core/src/cluster/room.rs @@ -270,7 +270,7 @@ mod tests { endpoint, ClusterEndpointControl::Join( peer.clone(), - PeerMeta { metadata: None, userdata: None }, + PeerMeta { metadata: None, extra_data: None }, RoomInfoPublish { peer: false, tracks: false }, RoomInfoSubscribe { peers: true, tracks: true }, Some(AudioMixerConfig { diff --git a/packages/media_core/src/cluster/room/metadata.rs b/packages/media_core/src/cluster/room/metadata.rs index 7ed596f2..48c26719 100644 --- a/packages/media_core/src/cluster/room/metadata.rs +++ b/packages/media_core/src/cluster/room/metadata.rs @@ -381,7 +381,7 @@ mod tests { let room: ClusterRoomHash = 1.into(); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; let endpoint = 1; room_meta.on_join( endpoint, @@ -408,7 +408,7 @@ mod tests { let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); let peer_key = id_generator::peers_key(&peer_id); let endpoint = 1; @@ -464,14 +464,14 @@ mod tests { let peer2: PeerId = "peer2".to_string().into(); let peer2_key = id_generator::peers_key(&peer2); - let peer2_info = PeerInfo::new(peer2, PeerMeta { metadata: None, userdata: None }); + let peer2_info = PeerInfo::new(peer2, PeerMeta { metadata: None, extra_data: None }); room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer2_key, 0, peer2_info.serialize())); assert_eq!(room_meta.pop_output(()), None); let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -500,7 +500,7 @@ mod tests { let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); let peer_key = id_generator::peers_key(&peer_id); let endpoint = 1; @@ -569,7 +569,7 @@ mod tests { let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -601,7 +601,7 @@ mod tests { let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); let peer_key = id_generator::peers_key(&peer_id); let endpoint = 1; @@ -643,7 +643,7 @@ mod tests { let room: ClusterRoomHash = 1.into(); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; let endpoint = 1; room_meta.on_join( endpoint, @@ -702,7 +702,7 @@ mod tests { let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -748,7 +748,7 @@ mod tests { let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -783,7 +783,7 @@ mod tests { let endpoint = 1; let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; room_meta.on_join( endpoint, peer_id.clone(), @@ -823,7 +823,7 @@ mod tests { let room: ClusterRoomHash = 1.into(); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); - let peer_meta = PeerMeta { metadata: None, userdata: None }; + let peer_meta = PeerMeta { metadata: None, extra_data: None }; let endpoint = 1; room_meta.on_join( endpoint, diff --git a/packages/media_core/src/endpoint/internal.rs b/packages/media_core/src/endpoint/internal.rs index ef02daf7..940a3fd1 100644 --- a/packages/media_core/src/endpoint/internal.rs +++ b/packages/media_core/src/endpoint/internal.rs @@ -533,7 +533,7 @@ mod tests { let room: RoomId = "room".into(); let peer: PeerId = "peer".into(); - let meta = PeerMeta { metadata: None, userdata: None }; + let meta = PeerMeta { metadata: None, extra_data: None }; let publish = RoomInfoPublish { peer: true, tracks: true }; let subscribe = RoomInfoSubscribe { peers: true, tracks: true }; internal.on_transport_rpc(now, 0.into(), EndpointReq::JoinRoom(room.clone(), peer.clone(), meta.clone(), publish.clone(), subscribe.clone(), None)); @@ -658,7 +658,7 @@ mod tests { let room1: RoomId = "room1".into(); let room1_hash = ClusterRoomHash::from(&room1); let peer: PeerId = "peer".into(); - let meta = PeerMeta { metadata: None, userdata: None }; + let meta = PeerMeta { metadata: None, extra_data: None }; let publish = RoomInfoPublish { peer: true, tracks: true }; let subscribe = RoomInfoSubscribe { peers: true, tracks: true }; internal.on_transport_rpc( diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index df308126..ad50e6ee 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -463,7 +463,7 @@ impl MediaServerWorker { match self .media_webrtc .input(&mut self.switcher) - .spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whip(req.room, req.peer, req.userdata, req.record), &req.sdp) + .spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whip(req.room, req.peer, req.extra_data, req.record), &req.sdp) { Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Ok(WhipConnectRes { conn_id, sdp }))))), Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Err(e))))), @@ -490,7 +490,7 @@ impl MediaServerWorker { match self .media_webrtc .input(&mut self.switcher) - .spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whep(req.room, peer_id.into(), req.userdata), &req.sdp) + .spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whep(req.room, peer_id.into(), req.extra_data), &req.sdp) { Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Ok(WhepConnectRes { conn_id, sdp }))))), Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Err(e))))), @@ -511,12 +511,12 @@ impl MediaServerWorker { } }, RpcReq::Webrtc(req) => match req { - webrtc::RpcReq::Connect(session_id, ip, user_agent, req, userdata, record) => { + webrtc::RpcReq::Connect(session_id, ip, user_agent, req, extra_data, record) => { log::info!("on rpc request {req_id}, webrtc::RpcReq::Connect"); match self .media_webrtc .input(&mut self.switcher) - .spawn(ip, session_id, VariantParams::Webrtc(user_agent, req.clone(), userdata, record, self.secure.clone()), &req.sdp) + .spawn(ip, session_id, VariantParams::Webrtc(user_agent, req.clone(), extra_data, record, self.secure.clone()), &req.sdp) { Ok((ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc( req_id, @@ -539,13 +539,13 @@ impl MediaServerWorker { GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::RemoteIce(req_id, transport_webrtc::Variant::Webrtc, ice.candidates)), ); } - webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, userdata, record) => { + webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, extra_data, record) => { log::info!("on rpc request {req_id}, webrtc::RpcReq::RestartIce"); self.media_webrtc.input(&mut self.switcher).on_event( now, GroupInput::Ext( conn.into(), - transport_webrtc::ExtIn::RestartIce(req_id, transport_webrtc::Variant::Webrtc, ip, user_agent, req, userdata, record), + transport_webrtc::ExtIn::RestartIce(req_id, transport_webrtc::Variant::Webrtc, ip, user_agent, req, extra_data, record), ), ); } diff --git a/packages/protocol/proto/cluster/gateway.proto b/packages/protocol/proto/cluster/gateway.proto index 22648645..3afeecda 100644 --- a/packages/protocol/proto/cluster/gateway.proto +++ b/packages/protocol/proto/cluster/gateway.proto @@ -68,7 +68,7 @@ message WhipConnectRequest { string peer = 5; uint64 session_id = 6; bool record = 7; - optional string userdata = 8; + optional string extra_data = 8; } message WhipConnectResponse { @@ -101,7 +101,7 @@ message WhepConnectRequest { string room = 4; string peer = 5; uint64 session_id = 6; - optional string userdata = 8; + optional string extra_data = 8; } message WhepConnectResponse { @@ -133,7 +133,7 @@ message WebrtcConnectRequest { gateway.ConnectRequest req = 3; uint64 session_id = 4; bool record = 5; - optional string userdata = 8; + optional string extra_data = 8; } message WebrtcConnectResponse { @@ -155,7 +155,7 @@ message WebrtcRestartIceRequest { string ip = 3; gateway.ConnectRequest req = 4; bool record = 5; - optional string userdata = 8; + optional string extra_data = 8; } message WebrtcRestartIceResponse { diff --git a/packages/protocol/proto/sdk/session.proto b/packages/protocol/proto/sdk/session.proto index ce54c774..c73ea820 100644 --- a/packages/protocol/proto/sdk/session.proto +++ b/packages/protocol/proto/sdk/session.proto @@ -233,7 +233,7 @@ message ServerEvent { message PeerJoined { string peer = 1; optional string metadata = 2; - optional string userdata = 3; + optional string extra_data = 3; } message PeerUpdated { diff --git a/packages/protocol/src/endpoint.rs b/packages/protocol/src/endpoint.rs index d8c6b748..e7f6495b 100644 --- a/packages/protocol/src/endpoint.rs +++ b/packages/protocol/src/endpoint.rs @@ -222,7 +222,7 @@ pub struct PeerHashCode(pub u64); #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PeerMeta { pub metadata: Option, - pub userdata: Option, //userdata is fixed data extracted from token + pub extra_data: Option, //extra_data is fixed data extracted from token } /// diff --git a/packages/protocol/src/protobuf/cluster_gateway.rs b/packages/protocol/src/protobuf/cluster_gateway.rs index 330ad0bd..296010a6 100644 --- a/packages/protocol/src/protobuf/cluster_gateway.rs +++ b/packages/protocol/src/protobuf/cluster_gateway.rs @@ -93,7 +93,7 @@ pub struct WhipConnectRequest { #[prost(bool, tag = "7")] pub record: bool, #[prost(string, optional, tag = "8")] - pub userdata: ::core::option::Option<::prost::alloc::string::String>, + pub extra_data: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -146,7 +146,7 @@ pub struct WhepConnectRequest { #[prost(uint64, tag = "6")] pub session_id: u64, #[prost(string, optional, tag = "8")] - pub userdata: ::core::option::Option<::prost::alloc::string::String>, + pub extra_data: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -197,7 +197,7 @@ pub struct WebrtcConnectRequest { #[prost(bool, tag = "5")] pub record: bool, #[prost(string, optional, tag = "8")] - pub userdata: ::core::option::Option<::prost::alloc::string::String>, + pub extra_data: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -233,7 +233,7 @@ pub struct WebrtcRestartIceRequest { #[prost(bool, tag = "5")] pub record: bool, #[prost(string, optional, tag = "8")] - pub userdata: ::core::option::Option<::prost::alloc::string::String>, + pub extra_data: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/packages/protocol/src/protobuf/session.rs b/packages/protocol/src/protobuf/session.rs index eadfc9af..872f5d14 100644 --- a/packages/protocol/src/protobuf/session.rs +++ b/packages/protocol/src/protobuf/session.rs @@ -408,7 +408,7 @@ pub mod server_event { #[prost(string, optional, tag = "2")] pub metadata: ::core::option::Option<::prost::alloc::string::String>, #[prost(string, optional, tag = "3")] - pub userdata: ::core::option::Option<::prost::alloc::string::String>, + pub extra_data: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/packages/protocol/src/tokens.rs b/packages/protocol/src/tokens.rs index c658a5ce..fea39737 100644 --- a/packages/protocol/src/tokens.rs +++ b/packages/protocol/src/tokens.rs @@ -5,14 +5,14 @@ pub struct WhipToken { pub room: String, pub peer: String, pub record: bool, - pub userdata: Option, + pub extra_data: Option, } #[derive(Serialize, Deserialize, Debug)] pub struct WhepToken { pub room: String, pub peer: Option, - pub userdata: Option, + pub extra_data: Option, } #[derive(Serialize, Deserialize, Debug)] @@ -20,5 +20,5 @@ pub struct WebrtcToken { pub room: Option, pub peer: Option, pub record: bool, - pub userdata: Option, + pub extra_data: Option, } diff --git a/packages/protocol/src/transport/webrtc.rs b/packages/protocol/src/transport/webrtc.rs index f21c4bdb..bcdd4ede 100644 --- a/packages/protocol/src/transport/webrtc.rs +++ b/packages/protocol/src/transport/webrtc.rs @@ -16,14 +16,14 @@ pub enum RpcReq { impl RpcReq { pub fn down(self) -> (RpcReq, Option) { match self { - RpcReq::Connect(session_id, ip_addr, user_agent, req, userdata, record) => (RpcReq::Connect(session_id, ip_addr, user_agent, req, userdata, record), None), + RpcReq::Connect(session_id, ip_addr, user_agent, req, extra_data, record) => (RpcReq::Connect(session_id, ip_addr, user_agent, req, extra_data, record), None), RpcReq::RemoteIce(conn, req) => { let (down, layer) = conn.down(); (RpcReq::RemoteIce(down, req), Some(layer)) } - RpcReq::RestartIce(conn, ip_addr, user_agent, req, userdata, record) => { + RpcReq::RestartIce(conn, ip_addr, user_agent, req, extra_data, record) => { let (down, layer) = conn.down(); - (RpcReq::RestartIce(down, ip_addr, user_agent, req, userdata, record), Some(layer)) + (RpcReq::RestartIce(down, ip_addr, user_agent, req, extra_data, record), Some(layer)) } RpcReq::Delete(conn) => { let (down, layer) = conn.down(); diff --git a/packages/protocol/src/transport/whep.rs b/packages/protocol/src/transport/whep.rs index c81ac181..ba12d4bd 100644 --- a/packages/protocol/src/transport/whep.rs +++ b/packages/protocol/src/transport/whep.rs @@ -15,7 +15,7 @@ pub struct WhepConnectReq { pub peer: PeerId, pub ip: IpAddr, pub user_agent: String, - pub userdata: Option, + pub extra_data: Option, } #[derive(Debug, Clone)] @@ -103,7 +103,7 @@ impl TryFrom for WhepConnectReq { peer: value.peer.into(), ip: value.ip.parse().map_err(|_| ())?, user_agent: value.user_agent, - userdata: value.userdata, + extra_data: value.extra_data, }) } } @@ -117,7 +117,7 @@ impl From for protobuf::cluster_gateway::WhepConnectRequest { sdp: val.sdp, room: val.room.0, peer: val.peer.0, - userdata: val.userdata, + extra_data: val.extra_data, } } } diff --git a/packages/protocol/src/transport/whip.rs b/packages/protocol/src/transport/whip.rs index 643de3e2..6322af8c 100644 --- a/packages/protocol/src/transport/whip.rs +++ b/packages/protocol/src/transport/whip.rs @@ -16,7 +16,7 @@ pub struct WhipConnectReq { pub record: bool, pub ip: IpAddr, pub user_agent: String, - pub userdata: Option, + pub extra_data: Option, } #[derive(Debug, Clone)] @@ -105,7 +105,7 @@ impl TryFrom for WhipConnectReq { record: value.record, ip: value.ip.parse().map_err(|_| ())?, user_agent: value.user_agent, - userdata: value.userdata, + extra_data: value.extra_data, }) } } @@ -120,7 +120,7 @@ impl From for protobuf::cluster_gateway::WhipConnectRequest { room: val.room.0, peer: val.peer.0, record: val.record, - userdata: val.userdata, + extra_data: val.extra_data, } } } diff --git a/packages/transport_webrtc/src/transport.rs b/packages/transport_webrtc/src/transport.rs index 694950b3..85ac8fdd 100644 --- a/packages/transport_webrtc/src/transport.rs +++ b/packages/transport_webrtc/src/transport.rs @@ -59,7 +59,7 @@ pub enum Variant { #[allow(clippy::large_enum_variant)] pub enum ExtIn { RemoteIce(u64, Variant, Vec), - /// Last option, bool is userdata and record flag + /// Last option, bool is extra_data and record flag RestartIce(u64, Variant, IpAddr, String, ConnectRequest, Option, bool), Disconnect(u64, Variant), } @@ -146,9 +146,9 @@ impl TransportWebrtc { let mut rtc = rtc_config.build(); let mut internal: Box = match variant { - VariantParams::Whip(room, peer, userdata, _record) => Box::new(whip::TransportWebrtcWhip::new(room, peer, userdata, remote)), - VariantParams::Whep(room, peer, userdata) => Box::new(whep::TransportWebrtcWhep::new(room, peer, userdata, remote)), - VariantParams::Webrtc(_user_agent, req, userdata, _record, secure) => { + VariantParams::Whip(room, peer, extra_data, _record) => Box::new(whip::TransportWebrtcWhip::new(room, peer, extra_data, remote)), + VariantParams::Whep(room, peer, extra_data) => Box::new(whep::TransportWebrtcWhep::new(room, peer, extra_data, remote)), + VariantParams::Webrtc(_user_agent, req, extra_data, _record, secure) => { rtc.direct_api().create_data_channel(ChannelConfig { label: "data".to_string(), negotiated: Some(1000), @@ -157,7 +157,7 @@ impl TransportWebrtc { //we need to start sctp as client side for handling restart-ice in new server //if not, datachannel will not connect successful after reconnect to new server rtc.direct_api().start_sctp(true); - Box::new(webrtc::TransportWebrtcSdk::new(req, userdata, secure, remote)) + Box::new(webrtc::TransportWebrtcSdk::new(req, extra_data, secure, remote)) } }; @@ -310,7 +310,7 @@ impl Transport for TransportWebrtc } self.queue.push_back(TransportOutput::Ext(ExtOut::RemoteIce(req_id, variant, Ok(success_count)))); } - ExtIn::RestartIce(req_id, variant, _ip, _useragent, req, _userdata, _record) => { + ExtIn::RestartIce(req_id, variant, _ip, _useragent, req, _extra_data, _record) => { if let Ok(offer) = SdpOffer::from_sdp_string(&req.sdp) { if let Ok(answer) = self.rtc.sdp_api().accept_offer(offer) { self.internal.on_codec_config(self.rtc.codec_config()); diff --git a/packages/transport_webrtc/src/transport/webrtc.rs b/packages/transport_webrtc/src/transport/webrtc.rs index 55ce537c..2eaa0130 100644 --- a/packages/transport_webrtc/src/transport/webrtc.rs +++ b/packages/transport_webrtc/src/transport/webrtc.rs @@ -74,7 +74,7 @@ enum TransportWebrtcError { pub struct TransportWebrtcSdk { remote: IpAddr, - userdata: Option, + extra_data: Option, join: Option<(RoomId, PeerId, Option, RoomInfoPublish, RoomInfoSubscribe)>, state: State, queue: DynamicDeque, @@ -89,14 +89,14 @@ pub struct TransportWebrtcSdk { } impl TransportWebrtcSdk { - pub fn new(req: ConnectRequest, userdata: Option, secure: Arc, remote: IpAddr) -> Self { + pub fn new(req: ConnectRequest, extra_data: Option, secure: Arc, remote: IpAddr) -> Self { let tracks = req.tracks.unwrap_or_default(); let local_tracks: Vec = tracks.receivers.into_iter().enumerate().map(|(index, r)| LocalTrack::new((index as u16).into(), r)).collect(); let remote_tracks: Vec = tracks.senders.into_iter().enumerate().map(|(index, s)| RemoteTrack::new((index as u16).into(), s)).collect(); if let Some(j) = req.join { Self { remote, - userdata, + extra_data, join: Some((j.room.into(), j.peer.into(), j.metadata, j.publish.unwrap_or_default().into(), j.subscribe.unwrap_or_default().into())), state: State::New, audio_mixer: j.features.and_then(|f| { @@ -122,7 +122,7 @@ impl TransportWebrtcSdk { } else { Self { remote, - userdata, + extra_data, join: None, state: State::New, local_tracks, @@ -244,7 +244,7 @@ impl TransportWebrtcInternal for TransportWebrtcSdk { event: Some(ProtoRoomEvent2::PeerJoined(PeerJoined { peer: peer.0, metadata: meta.metadata, - userdata: meta.userdata, + extra_data: meta.extra_data, })), })); } @@ -436,7 +436,7 @@ impl TransportWebrtcInternal for TransportWebrtcSdk { peer.clone(), PeerMeta { metadata: metadata.clone(), - userdata: self.userdata.clone(), + extra_data: self.extra_data.clone(), }, publish.clone(), subscribe.clone(), @@ -640,7 +640,7 @@ impl TransportWebrtcSdk { let info = req.info.unwrap_or_default(); let meta = PeerMeta { metadata: info.metadata, - userdata: self.userdata.clone(), + extra_data: self.extra_data.clone(), }; if let Some(token) = self.secure.decode_obj::("webrtc", &req.token) { if token.room == Some(info.room.clone()) && token.peer == Some(info.peer.clone()) { @@ -851,7 +851,7 @@ mod tests { let now = Instant::now(); let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let secure_jwt = Arc::new(MediaEdgeSecureJwt::from(b"1234".as_slice())); - let mut transport = TransportWebrtcSdk::new(req, Some("userdata".to_string()), secure_jwt.clone(), ip); + let mut transport = TransportWebrtcSdk::new(req, Some("extra_data".to_string()), secure_jwt.clone(), ip); assert_eq!(transport.pop_output(now), None); transport.on_tick(now); @@ -874,7 +874,7 @@ mod tests { "peer".to_string().into(), PeerMeta { metadata: Some("metadata".to_string()), - userdata: Some("userdata".to_string()) + extra_data: Some("extra_data".to_string()) }, RoomInfoPublish { peer: true, tracks: true }, RoomInfoSubscribe { peers: true, tracks: true }, @@ -895,7 +895,7 @@ mod tests { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let gateway_jwt = MediaGatewaySecureJwt::from(b"1234".as_slice()); let secure_jwt = Arc::new(MediaEdgeSecureJwt::from(b"1234".as_slice())); - let mut transport = TransportWebrtcSdk::new(req, Some("userdata".to_string()), secure_jwt.clone(), ip); + let mut transport = TransportWebrtcSdk::new(req, Some("extra_data".to_string()), secure_jwt.clone(), ip); assert_eq!(transport.pop_output(now), None); transport.on_tick(now); @@ -917,7 +917,7 @@ mod tests { room: Some("demo".to_string()), peer: Some("peer1".to_string()), record: false, - userdata: Some("userdata".to_string()), + extra_data: Some("extra_data".to_string()), }, 10000, ); @@ -950,7 +950,7 @@ mod tests { "peer1".to_string().into(), PeerMeta { metadata: None, - userdata: Some("userdata".to_string()) + extra_data: Some("extra_data".to_string()) }, RoomInfoPublish { peer: false, tracks: false }, RoomInfoSubscribe { peers: false, tracks: false }, diff --git a/packages/transport_webrtc/src/transport/whep.rs b/packages/transport_webrtc/src/transport/whep.rs index 4b7cb1bf..d051d2c4 100644 --- a/packages/transport_webrtc/src/transport/whep.rs +++ b/packages/transport_webrtc/src/transport/whep.rs @@ -52,7 +52,7 @@ pub struct TransportWebrtcWhep { remote: IpAddr, room: RoomId, peer: PeerId, - userdata: Option, + extra_data: Option, state: State, audio_mid: Option, video_mid: Option, @@ -64,12 +64,12 @@ pub struct TransportWebrtcWhep { } impl TransportWebrtcWhep { - pub fn new(room: RoomId, peer: PeerId, userdata: Option, remote: IpAddr) -> Self { + pub fn new(room: RoomId, peer: PeerId, extra_data: Option, remote: IpAddr) -> Self { Self { remote, room, peer, - userdata, + extra_data, state: State::New, audio_mid: None, video_mid: None, @@ -178,7 +178,7 @@ impl TransportWebrtcInternal for TransportWebrtcWhep { self.peer.clone(), PeerMeta { metadata: None, - userdata: self.userdata.clone(), + extra_data: self.extra_data.clone(), }, RoomInfoPublish { peer: false, tracks: false }, RoomInfoSubscribe { peers: false, tracks: true }, diff --git a/packages/transport_webrtc/src/transport/whip.rs b/packages/transport_webrtc/src/transport/whip.rs index 48183dc3..1e1d291a 100644 --- a/packages/transport_webrtc/src/transport/whip.rs +++ b/packages/transport_webrtc/src/transport/whip.rs @@ -48,7 +48,7 @@ pub struct TransportWebrtcWhip { remote: IpAddr, room: RoomId, peer: PeerId, - userdata: Option, + extra_data: Option, state: State, audio_mid: Option, ///mid and simulcast flag @@ -58,12 +58,12 @@ pub struct TransportWebrtcWhip { } impl TransportWebrtcWhip { - pub fn new(room: RoomId, peer: PeerId, userdata: Option, remote: IpAddr) -> Self { + pub fn new(room: RoomId, peer: PeerId, extra_data: Option, remote: IpAddr) -> Self { Self { remote, room, peer, - userdata, + extra_data, state: State::New, audio_mid: None, video_mid: None, @@ -157,7 +157,7 @@ impl TransportWebrtcInternal for TransportWebrtcWhip { self.peer.clone(), PeerMeta { metadata: None, - userdata: self.userdata.clone(), + extra_data: self.extra_data.clone(), }, RoomInfoPublish { peer: true, tracks: true }, RoomInfoSubscribe { peers: false, tracks: false }, diff --git a/packages/transport_webrtc/src/worker.rs b/packages/transport_webrtc/src/worker.rs index 65ff29e1..2cf4cdbe 100644 --- a/packages/transport_webrtc/src/worker.rs +++ b/packages/transport_webrtc/src/worker.rs @@ -154,10 +154,10 @@ impl MediaWorkerWebrtc { self.queue .push_back(GroupOutput::Ext(owner, ExtOut::RemoteIce(req_id, variant, Err(RpcError::new2(WebrtcError::RpcEndpointNotFound))))); } - ExtIn::RestartIce(req_id, variant, remote, useragent, req, userdata, record) => { + ExtIn::RestartIce(req_id, variant, remote, useragent, req, extra_data, record) => { let sdp = req.sdp.clone(); let session_id = gen_cluster_session_id(); //TODO need to reuse old session_id - if let Ok((ice_lite, sdp, index)) = self.spawn(remote, session_id, VariantParams::Webrtc(useragent, req, userdata, record, self.secure.clone()), &sdp) { + if let Ok((ice_lite, sdp, index)) = self.spawn(remote, session_id, VariantParams::Webrtc(useragent, req, extra_data, record, self.secure.clone()), &sdp) { self.queue.push_back(GroupOutput::Ext(index.into(), ExtOut::RestartIce(req_id, variant, Ok((ice_lite, sdp))))); } else { self.queue