Skip to content

Commit

Permalink
refactor: ename peer's userdata to extra_data for avoid miss-understa…
Browse files Browse the repository at this point in the history
…nd (8xFF#386)
  • Loading branch information
giangndm authored Jul 26, 2024
1 parent a484a4e commit b63e5cf
Show file tree
Hide file tree
Showing 26 changed files with 100 additions and 100 deletions.
8 changes: 4 additions & 4 deletions bin/src/http/api_media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
peer: token.peer.into(),
user_agent,
record: token.record,
userdata: token.userdata,
extra_data: token.extra_data,
})));
ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
Expand Down Expand Up @@ -168,7 +168,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
room: token.room.into(),
peer: token.peer.unwrap_or_else(|| format!("whep-{}", (random::<u64>()))).into(),
user_agent,
userdata: token.userdata,
extra_data: token.extra_data,
})));
ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
Expand Down Expand Up @@ -267,7 +267,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
return Err(poem::Error::from_string("Wrong peer".to_string(), StatusCode::FORBIDDEN));
}
}
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.userdata, token.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.extra_data, token.record)));
ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
Expand Down Expand Up @@ -336,7 +336,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
}
}
log::info!("[MediaAPIs] restart_ice webrtc, ip {}, user_agent {}, conn {}, request {:?}", ip_addr, user_agent, conn_id.0, connect);
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.userdata, token.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.extra_data, token.record)));
ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
Expand Down
12 changes: 6 additions & 6 deletions bin/src/http/api_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct WhipTokenReq {
peer: String,
ttl: u64,
record: Option<bool>,
userdata: Option<String>,
extra_data: Option<String>,
}

#[derive(poem_openapi::Object)]
Expand All @@ -38,7 +38,7 @@ struct WhepTokenReq {
room: String,
peer: Option<String>,
ttl: u64,
userdata: Option<String>,
extra_data: Option<String>,
}

#[derive(poem_openapi::Object)]
Expand All @@ -52,7 +52,7 @@ struct WebrtcTokenReq {
peer: Option<String>,
ttl: u64,
record: Option<bool>,
userdata: Option<String>,
extra_data: Option<String>,
}

#[derive(poem_openapi::Object)]
Expand Down Expand Up @@ -84,7 +84,7 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
room: body.room,
peer: body.peer,
record: body.record.unwrap_or(false),
userdata: body.userdata,
extra_data: body.extra_data,
},
body.ttl,
),
Expand Down Expand Up @@ -113,7 +113,7 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
WhepToken {
room: body.room,
peer: body.peer,
userdata: body.userdata,
extra_data: body.extra_data,
},
body.ttl,
),
Expand Down Expand Up @@ -142,7 +142,7 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
room: body.room,
peer: body.peer,
record: body.record.unwrap_or(false),
userdata: body.userdata,
extra_data: body.extra_data,
},
body.ttl,
),
Expand Down
16 changes: 8 additions & 8 deletions bin/src/server/gateway/local_rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ impl MediaLocalRpcHandler {
whep::RpcReq::Delete(param) => RpcRes::Whep(whep::RpcRes::Delete(self.whep_delete(conn_part, param).await)),
},
RpcReq::Webrtc(param) => match param {
webrtc::RpcReq::Connect(session_id, ip, user_agent, param, userdata, record) => {
RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, userdata, record).await))
webrtc::RpcReq::Connect(session_id, ip, user_agent, param, extra_data, record) => {
RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, extra_data, record).await))
}
webrtc::RpcReq::RemoteIce(conn, param) => RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(self.webrtc_remote_ice(conn_part, conn, param).await)),
webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, userdata, record) => {
RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, userdata, record).await))
webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, extra_data, record) => {
RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, extra_data, record).await))
}
webrtc::RpcReq::Delete(_) => {
//TODO implement delete webrtc conn
Expand Down Expand Up @@ -276,7 +276,7 @@ impl MediaLocalRpcHandler {
Webrtc part
*/

async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, userdata: Option<String>, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> {
async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, extra_data: Option<String>, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> {
let started_at = now_ms();
self.feedback_route_begin(session_id, ip).await;

Expand All @@ -289,7 +289,7 @@ impl MediaLocalRpcHandler {
ip: ip.to_string(),
req: Some(req),
record,
userdata,
extra_data,
};
let res = self.client.webrtc_connect(sock_addr, rpc_req).await;
log::info!("[Gateway] response from node {node_id} => {:?}", res);
Expand Down Expand Up @@ -343,7 +343,7 @@ impl MediaLocalRpcHandler {
ip: IpAddr,
user_agent: String,
req: ConnectRequest,
userdata: Option<String>,
extra_data: Option<String>,
record: bool,
) -> RpcResult<(ClusterConnId, ConnectResponse)> {
//TODO how to handle media-node down?
Expand All @@ -354,7 +354,7 @@ impl MediaLocalRpcHandler {
user_agent,
req: Some(req),
record,
userdata,
extra_data,
};
log::info!("[Gateway] selected node {node}");
let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT);
Expand Down
4 changes: 2 additions & 2 deletions bin/src/server/media/rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
req.ip.parse().ok()?,
req.user_agent,
req.req?,
req.userdata,
req.extra_data,
req.record,
)));
ctx.req_tx.send(req).await.ok()?;
Expand Down Expand Up @@ -159,7 +159,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
req.ip.parse().ok()?,
req.user_agent,
req.req?,
req.userdata,
req.extra_data,
req.record,
)));
ctx.req_tx.send(req).await.ok()?;
Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/features/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ In this document, we will explore the key features of atm0s-media-server. Curren
| [Simulcast/SVC](./simulcast-svc.md) | Alpha |
| [Recording](./recording.md) | TODO |
| [Cluster](./cluster.md) | Alpha |
| [Userdata-metadata](./userdata-metadata.md) | Alpha |
| [extra_data-metadata](./extra_data-metadata.md) | Alpha |
6 changes: 6 additions & 0 deletions docs/user-guide/features/extra_data-metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# extra_data and metadata

In atm0s-media, extra_data and metadata is used with same goal is providing addition data beside of only room and peer. We use 2 terms extra_data and metadata with some difference:

- extra_data: a string which is embedded to peer token, which can not be changed by client
- metadata: a string which is embedded to peer or track by client. In there peer metadata is set at join-room step, track metadata is set at publish track step.
6 changes: 0 additions & 6 deletions docs/user-guide/features/userdata-metadata.md

This file was deleted.

2 changes: 1 addition & 1 deletion packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod tests {
let room_peers_map = id_generator::peers_map(userdata.0);
let peer = PeerId("peer1".to_string());
let peer_key = id_generator::peers_key(&peer);
let peer_info = PeerInfo::new(peer.clone(), PeerMeta { metadata: None, userdata: None });
let peer_info = PeerInfo::new(peer.clone(), PeerMeta { metadata: None, extra_data: None });

let now = Instant::now();
// Not join room with scope (peer true, track false) should Set and Sub
Expand Down
2 changes: 1 addition & 1 deletion packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ mod tests {
endpoint,
ClusterEndpointControl::Join(
peer.clone(),
PeerMeta { metadata: None, userdata: None },
PeerMeta { metadata: None, extra_data: None },
RoomInfoPublish { peer: false, tracks: false },
RoomInfoSubscribe { peers: true, tracks: true },
Some(AudioMixerConfig {
Expand Down
24 changes: 12 additions & 12 deletions packages/media_core/src/cluster/room/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ mod tests {
let room: ClusterRoomHash = 1.into();
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
let endpoint = 1;
room_meta.on_join(
endpoint,
Expand All @@ -408,7 +408,7 @@ mod tests {
let tracks_map = id_generator::tracks_map(room);
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone());
let peer_key = id_generator::peers_key(&peer_id);
let endpoint = 1;
Expand Down Expand Up @@ -464,14 +464,14 @@ mod tests {

let peer2: PeerId = "peer2".to_string().into();
let peer2_key = id_generator::peers_key(&peer2);
let peer2_info = PeerInfo::new(peer2, PeerMeta { metadata: None, userdata: None });
let peer2_info = PeerInfo::new(peer2, PeerMeta { metadata: None, extra_data: None });

room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer2_key, 0, peer2_info.serialize()));
assert_eq!(room_meta.pop_output(()), None);

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -500,7 +500,7 @@ mod tests {
let tracks_map = id_generator::tracks_map(room);
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone());
let peer_key = id_generator::peers_key(&peer_id);
let endpoint = 1;
Expand Down Expand Up @@ -569,7 +569,7 @@ mod tests {

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -601,7 +601,7 @@ mod tests {
let tracks_map = id_generator::tracks_map(room);
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone());
let peer_key = id_generator::peers_key(&peer_id);
let endpoint = 1;
Expand Down Expand Up @@ -643,7 +643,7 @@ mod tests {
let room: ClusterRoomHash = 1.into();
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
let endpoint = 1;
room_meta.on_join(
endpoint,
Expand Down Expand Up @@ -702,7 +702,7 @@ mod tests {

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -748,7 +748,7 @@ mod tests {

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -783,7 +783,7 @@ mod tests {

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -823,7 +823,7 @@ mod tests {
let room: ClusterRoomHash = 1.into();
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_meta = PeerMeta { metadata: None, extra_data: None };
let endpoint = 1;
room_meta.on_join(
endpoint,
Expand Down
4 changes: 2 additions & 2 deletions packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ mod tests {

let room: RoomId = "room".into();
let peer: PeerId = "peer".into();
let meta = PeerMeta { metadata: None, userdata: None };
let meta = PeerMeta { metadata: None, extra_data: None };
let publish = RoomInfoPublish { peer: true, tracks: true };
let subscribe = RoomInfoSubscribe { peers: true, tracks: true };
internal.on_transport_rpc(now, 0.into(), EndpointReq::JoinRoom(room.clone(), peer.clone(), meta.clone(), publish.clone(), subscribe.clone(), None));
Expand Down Expand Up @@ -658,7 +658,7 @@ mod tests {
let room1: RoomId = "room1".into();
let room1_hash = ClusterRoomHash::from(&room1);
let peer: PeerId = "peer".into();
let meta = PeerMeta { metadata: None, userdata: None };
let meta = PeerMeta { metadata: None, extra_data: None };
let publish = RoomInfoPublish { peer: true, tracks: true };
let subscribe = RoomInfoSubscribe { peers: true, tracks: true };
internal.on_transport_rpc(
Expand Down
Loading

0 comments on commit b63e5cf

Please sign in to comment.