diff --git a/packages/media-utils/src/req_res.rs b/packages/media-utils/src/req_res.rs index 1800bed5..4bc82bca 100644 --- a/packages/media-utils/src/req_res.rs +++ b/packages/media-utils/src/req_res.rs @@ -1,14 +1,39 @@ -use poem_openapi::types::{ToJSON, Type}; -use poem_openapi::{types::ParseFromJSON, Object}; +use poem_openapi::{ + types::{ParseFromJSON, ToJSON, Type}, + Object, +}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Default, Serialize, Deserialize, Object)] -pub struct Response { - pub status: bool, +#[derive(Debug, Serialize, Deserialize, Object)] +pub struct Response { + pub success: bool, #[serde(skip_serializing_if = "Option::is_none")] #[oai(skip_serializing_if = "Option::is_none")] - pub error: Option, + pub error_code: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[oai(skip_serializing_if = "Option::is_none")] + pub error_msg: Option, #[serde(skip_serializing_if = "Option::is_none")] #[oai(skip_serializing_if = "Option::is_none")] pub data: Option, } + +impl Response { + pub fn success>(data: T2) -> Self { + Self { + success: true, + data: Some(data.into()), + error_code: None, + error_msg: None, + } + } + + pub fn error>(error_code: E2, error_msg: &str) -> Self { + Self { + success: false, + error_code: Some(error_code.into()), + error_msg: Some(error_msg.to_string()), + data: None, + } + } +} diff --git a/servers/media-server/examples/simple_sip_calling_service.rs b/servers/media-server/examples/simple_sip_calling_service.rs index fa7f2eb7..8f430d73 100644 --- a/servers/media-server/examples/simple_sip_calling_service.rs +++ b/servers/media-server/examples/simple_sip_calling_service.rs @@ -69,7 +69,7 @@ impl Context { }) .send() .await? - .json::>() + .json::>() .await?; log::info!("make_client_call: {:?}", res); Ok(()) diff --git a/servers/media-server/src/server/gateway/rpc/http.rs b/servers/media-server/src/server/gateway/rpc/http.rs index 2ff8d971..dab29db6 100644 --- a/servers/media-server/src/server/gateway/rpc/http.rs +++ b/servers/media-server/src/server/gateway/rpc/http.rs @@ -41,12 +41,8 @@ pub struct GatewayHttpApis; impl GatewayHttpApis { /// get node health #[oai(path = "/health", method = "get")] - async fn health(&self, Data(_ctx): Data<&DataContainer>) -> Result>> { - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + async fn health(&self, Data(_ctx): Data<&DataContainer>) -> Result>> { + Ok(Json(Response::success("OK".to_string()))) } /// get best nodes @@ -137,7 +133,7 @@ impl GatewayHttpApis { /// delete whip conn #[oai(path = "/whip/conn/:conn_id", method = "delete")] - async fn conn_whip_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { + async fn conn_whip_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { log::info!("[HttpApis] close whip endpoint conn {}", conn_id.0); let (req, rx) = RpcReqResHttp::::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() }); data.0 @@ -147,11 +143,7 @@ impl GatewayHttpApis { let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; log::info!("[HttpApis] Whip endpoint closed conn {}", conn_id.0); - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK".to_string()))) } /// connect whep endpoint @@ -223,7 +215,7 @@ impl GatewayHttpApis { /// delete whip conn #[oai(path = "/whep/conn/:conn_id", method = "delete")] - async fn conn_whep_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { + async fn conn_whep_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { log::info!("[HttpApis] close whep endpoint conn {}", conn_id.0); let (req, rx) = RpcReqResHttp::::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() }); data.0 @@ -233,11 +225,7 @@ impl GatewayHttpApis { let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; log::info!("[HttpApis] Whep endpoint closed conn {}", conn_id.0); - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK".to_string()))) } /// connect webrtc endpoint @@ -248,7 +236,7 @@ impl GatewayHttpApis { UserAgent(user_agent): UserAgent, RemoteIpAddr(ip_addr): RemoteIpAddr, mut body: Json, - ) -> Result>> { + ) -> Result>> { let string_zip = StringCompression::default(); log::info!("[HttpApis] create Webrtc endpoint {}/{}", body.0.room, body.0.peer); if let Some(sdp) = body.0.sdp.take() { @@ -259,11 +247,7 @@ impl GatewayHttpApis { body.0.user_agent = user_agent; if body.0.verify(data.1.verifier().deref()).is_none() { - return Ok(Json(Response { - status: false, - error: Some("INVALID_TOKEN".to_string()), - data: None, - })); + return Ok(Json(Response::error("INVALID_TOKEN".to_string(), "INVALID_TOKEN"))); } let (req, rx) = RpcReqResHttp::::new(body.0); @@ -279,20 +263,16 @@ impl GatewayHttpApis { _ => Err(poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR)), }?; log::info!("[HttpApis] Webrtc endpoint created with conn_id {}", res.conn_id); - Ok(Json(Response { - status: true, - error: None, - data: Some(WebrtcSdp { - node_id: 0, - conn_id: res.conn_id, - sdp, - }), - })) + Ok(Json(Response::success(WebrtcSdp { + node_id: 0, + conn_id: res.conn_id, + sdp, + }))) } /// sending remote ice candidate #[oai(path = "/webrtc/ice_remote", method = "post")] - async fn webrtc_ice_remote(&self, Data(data): Data<&DataContainer>, body: Json) -> Result>> { + async fn webrtc_ice_remote(&self, Data(data): Data<&DataContainer>, body: Json) -> Result>> { log::info!("[HttpApis] on Webrtc endpoint ice-remote {}", body.0.candidate); let (req, rx) = RpcReqResHttp::::new(body.0); data.0 @@ -301,16 +281,12 @@ impl GatewayHttpApis { .map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK".to_string()))) } /// delete webrtc conn #[oai(path = "/webrtc/conn/:conn_id", method = "delete")] - async fn conn_webrtc_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { + async fn conn_webrtc_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { log::info!("[HttpApis] close webrtc endpoint conn {}", conn_id.0); let (req, rx) = RpcReqResHttp::::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() }); data.0 @@ -320,10 +296,6 @@ impl GatewayHttpApis { let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; log::info!("[HttpApis] Webrtc endpoint closed conn {}", conn_id.0); - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK".to_string()))) } } diff --git a/servers/media-server/src/server/rtmp/rpc/http.rs b/servers/media-server/src/server/rtmp/rpc/http.rs index 3deb01be..2b1bc25e 100644 --- a/servers/media-server/src/server/rtmp/rpc/http.rs +++ b/servers/media-server/src/server/rtmp/rpc/http.rs @@ -20,17 +20,13 @@ pub struct RtmpHttpApis; impl RtmpHttpApis { /// get node health #[oai(path = "/health", method = "get")] - async fn health(&self, Data(_ctx): Data<&(Sender, MediaServerContext<()>)>) -> Result>> { - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + async fn health(&self, Data(_ctx): Data<&(Sender, MediaServerContext<()>)>) -> Result>> { + Ok(Json(Response::success("OK"))) } /// delete Rtmp conn #[oai(path = "/rtmp/conn/:conn_id", method = "delete")] - async fn conn_rtmp_delete(&self, Data(ctx): Data<&(Sender, MediaServerContext<()>)>, conn_id: Path) -> Result>> { + async fn conn_rtmp_delete(&self, Data(ctx): Data<&(Sender, MediaServerContext<()>)>, conn_id: Path) -> Result>> { log::info!("[HttpApis] close Rtmp endpoint conn {}", conn_id.0); let (req, rx) = RpcReqResHttp::::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() }); ctx.0 @@ -40,10 +36,6 @@ impl RtmpHttpApis { let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; log::info!("[HttpApis] Rtmp endpoint closed conn {}", conn_id.0); - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK"))) } } diff --git a/servers/media-server/src/server/sip/rpc/http.rs b/servers/media-server/src/server/sip/rpc/http.rs index 4826c3d6..d1a3f034 100644 --- a/servers/media-server/src/server/sip/rpc/http.rs +++ b/servers/media-server/src/server/sip/rpc/http.rs @@ -18,12 +18,8 @@ pub struct SipHttpApis; impl SipHttpApis { /// get node health #[oai(path = "/health", method = "get")] - async fn health(&self, Data(_ctx): Data<&(Sender, MediaServerContext<()>)>) -> Result>> { - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + async fn health(&self, Data(_ctx): Data<&(Sender, MediaServerContext<()>)>) -> Result>> { + Ok(Json(Response::success("OK"))) } /// invite a client @@ -32,7 +28,7 @@ impl SipHttpApis { &self, Data(ctx): Data<&(Sender, MediaServerContext)>, body: Json, - ) -> Result>> { + ) -> Result>> { log::info!("[HttpApis] invite sip client {:?}", body.0); let (req, rx) = RpcReqResHttp::new(body.0); ctx.0 @@ -41,16 +37,16 @@ impl SipHttpApis { .map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; - Ok(Json(Response { - status: true, - error: None, - data: Some(res), - })) + Ok(Json(Response::success(res))) } /// invite a server #[oai(path = "/sip/invite/server", method = "post")] - async fn invite_server(&self, Data(ctx): Data<&(Sender, MediaServerContext<()>)>, body: Json) -> Result>> { + async fn invite_server( + &self, + Data(ctx): Data<&(Sender, MediaServerContext<()>)>, + body: Json, + ) -> Result>> { log::info!("[HttpApis] invite sip client {:?}", body.0); let (req, rx) = RpcReqResHttp::new(body.0); ctx.0 @@ -59,10 +55,6 @@ impl SipHttpApis { .map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; - Ok(Json(Response { - status: true, - error: None, - data: Some(res), - })) + Ok(Json(Response::success(res))) } } diff --git a/servers/media-server/src/server/webrtc/rpc/http.rs b/servers/media-server/src/server/webrtc/rpc/http.rs index f37581fa..16311c18 100644 --- a/servers/media-server/src/server/webrtc/rpc/http.rs +++ b/servers/media-server/src/server/webrtc/rpc/http.rs @@ -38,12 +38,8 @@ pub struct WebrtcHttpApis; impl WebrtcHttpApis { /// get node health #[oai(path = "/health", method = "get")] - async fn health(&self, Data(_ctx): Data<&DataContainer>) -> Result>> { - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + async fn health(&self, Data(_ctx): Data<&DataContainer>) -> Result>> { + Ok(Json(Response::success("OK"))) } /// connect whip endpoint @@ -122,7 +118,7 @@ impl WebrtcHttpApis { /// delete whip conn #[oai(path = "/whip/conn/:conn_id", method = "delete")] - async fn conn_whip_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { + async fn conn_whip_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { log::info!("[HttpApis] close whip endpoint conn {}", conn_id.0); let (req, rx) = RpcReqResHttp::::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() }); data.0 @@ -132,11 +128,7 @@ impl WebrtcHttpApis { let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; log::info!("[HttpApis] Whip endpoint closed conn {}", conn_id.0); - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK"))) } /// connect whep endpoint @@ -216,7 +208,7 @@ impl WebrtcHttpApis { /// delete whip conn #[oai(path = "/whep/conn/:conn_id", method = "delete")] - async fn conn_whep_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { + async fn conn_whep_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { log::info!("[HttpApis] close whep endpoint conn {}", conn_id.0); let (req, rx) = RpcReqResHttp::::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() }); data.0 @@ -226,11 +218,7 @@ impl WebrtcHttpApis { let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; log::info!("[HttpApis] Whep endpoint closed conn {}", conn_id.0); - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK"))) } /// connect webrtc endpoint @@ -241,7 +229,7 @@ impl WebrtcHttpApis { UserAgent(user_agent): UserAgent, RemoteIpAddr(ip_addr): RemoteIpAddr, mut body: Json, - ) -> Result>> { + ) -> Result>> { let token_success = match data.1.verifier().verify_media_session(&body.0.token) { None => false, Some(s_token) => { @@ -256,8 +244,9 @@ impl WebrtcHttpApis { }; if !token_success { return Ok(Json(Response { - status: false, - error: Some("INVALID_TOKEN".to_string()), + success: false, + error_code: Some("INVALID_TOKEN".to_string()), + error_msg: Some("INVALID_TOKEN".to_string()), data: None, })); } @@ -282,21 +271,17 @@ impl WebrtcHttpApis { _ => Err(poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR)), }?; log::info!("[HttpApis] Webrtc endpoint created with conn_id {}", res.conn_id); - Ok(Json(Response { - status: true, - error: None, - data: Some(WebrtcSdp { - node_id: 0, - conn_id: res.conn_id, - sdp, - service_token: None, - }), - })) + Ok(Json(Response::success(WebrtcSdp { + node_id: 0, + conn_id: res.conn_id, + sdp, + service_token: None, + }))) } /// sending remote ice candidate #[oai(path = "/webrtc/ice_remote", method = "post")] - async fn webrtc_ice_remote(&self, Data(data): Data<&DataContainer>, body: Json) -> Result>> { + async fn webrtc_ice_remote(&self, Data(data): Data<&DataContainer>, body: Json) -> Result>> { log::info!("[HttpApis] on Webrtc endpoint ice-remote {}", body.0.candidate); let (req, rx) = RpcReqResHttp::::new(body.0); data.0 @@ -305,16 +290,12 @@ impl WebrtcHttpApis { .map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?; let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK"))) } /// delete webrtc conn #[oai(path = "/webrtc/conn/:conn_id", method = "delete")] - async fn conn_webrtc_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { + async fn conn_webrtc_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path) -> Result>> { log::info!("[HttpApis] close webrtc endpoint conn {}", conn_id.0); let (req, rx) = RpcReqResHttp::::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() }); data.0 @@ -324,10 +305,6 @@ impl WebrtcHttpApis { let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?; let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; log::info!("[HttpApis] Webrtc endpoint closed conn {}", conn_id.0); - Ok(Json(Response { - status: true, - error: None, - data: Some("OK".to_string()), - })) + Ok(Json(Response::success("OK"))) } }