Skip to content

Commit

Permalink
change http req-res
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Feb 16, 2024
1 parent a9c32eb commit 5be4c50
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 125 deletions.
37 changes: 31 additions & 6 deletions packages/media-utils/src/req_res.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
use poem_openapi::types::{ToJSON, Type};
use poem_openapi::{types::ParseFromJSON, Object};
use poem_openapi::{
types::{ParseFromJSON, ToJSON, Type},
Object,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Default, Serialize, Deserialize, Object)]
pub struct Response<T: ParseFromJSON + ToJSON + Type + Send + Sync> {
pub status: bool,
#[derive(Debug, Serialize, Deserialize, Object)]
pub struct Response<T: ParseFromJSON + ToJSON + Type + Send + Sync, E: ParseFromJSON + ToJSON + Type + Send + Sync> {
pub success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
#[oai(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
pub error_code: Option<E>,
#[serde(skip_serializing_if = "Option::is_none")]
#[oai(skip_serializing_if = "Option::is_none")]
pub error_msg: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[oai(skip_serializing_if = "Option::is_none")]
pub data: Option<T>,
}

impl<T: ParseFromJSON + ToJSON + Type + Send + Sync, E: ParseFromJSON + ToJSON + Type + Send + Sync> Response<T, E> {
pub fn success<T2: Into<T>>(data: T2) -> Self {
Self {
success: true,
data: Some(data.into()),
error_code: None,
error_msg: None,
}
}

pub fn error<E2: Into<E>>(error_code: E2, error_msg: &str) -> Self {
Self {
success: false,
error_code: Some(error_code.into()),
error_msg: Some(error_msg.to_string()),
data: None,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Context {
})
.send()
.await?
.json::<Response<SipOutgoingInviteResponse>>()
.json::<Response<SipOutgoingInviteResponse, String>>()
.await?;
log::info!("make_client_call: {:?}", res);
Ok(())
Expand Down
62 changes: 17 additions & 45 deletions servers/media-server/src/server/gateway/rpc/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,8 @@ pub struct GatewayHttpApis;
impl GatewayHttpApis {
/// get node health
#[oai(path = "/health", method = "get")]
async fn health(&self, Data(_ctx): Data<&DataContainer>) -> Result<Json<Response<String>>> {
Ok(Json(Response {
status: true,
error: None,
data: Some("OK".to_string()),
}))
async fn health(&self, Data(_ctx): Data<&DataContainer>) -> Result<Json<Response<String, String>>> {
Ok(Json(Response::success("OK".to_string())))
}

/// get best nodes
Expand Down Expand Up @@ -137,7 +133,7 @@ impl GatewayHttpApis {

/// delete whip conn
#[oai(path = "/whip/conn/:conn_id", method = "delete")]
async fn conn_whip_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path<String>) -> Result<Json<Response<String>>> {
async fn conn_whip_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path<String>) -> Result<Json<Response<String, String>>> {
log::info!("[HttpApis] close whip endpoint conn {}", conn_id.0);
let (req, rx) = RpcReqResHttp::<MediaEndpointCloseRequest, MediaEndpointCloseResponse>::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() });
data.0
Expand All @@ -147,11 +143,7 @@ impl GatewayHttpApis {
let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[HttpApis] Whip endpoint closed conn {}", conn_id.0);
Ok(Json(Response {
status: true,
error: None,
data: Some("OK".to_string()),
}))
Ok(Json(Response::success("OK".to_string())))
}

/// connect whep endpoint
Expand Down Expand Up @@ -223,7 +215,7 @@ impl GatewayHttpApis {

/// delete whip conn
#[oai(path = "/whep/conn/:conn_id", method = "delete")]
async fn conn_whep_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path<String>) -> Result<Json<Response<String>>> {
async fn conn_whep_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path<String>) -> Result<Json<Response<String, String>>> {
log::info!("[HttpApis] close whep endpoint conn {}", conn_id.0);
let (req, rx) = RpcReqResHttp::<MediaEndpointCloseRequest, MediaEndpointCloseResponse>::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() });
data.0
Expand All @@ -233,11 +225,7 @@ impl GatewayHttpApis {
let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[HttpApis] Whep endpoint closed conn {}", conn_id.0);
Ok(Json(Response {
status: true,
error: None,
data: Some("OK".to_string()),
}))
Ok(Json(Response::success("OK".to_string())))
}

/// connect webrtc endpoint
Expand All @@ -248,7 +236,7 @@ impl GatewayHttpApis {
UserAgent(user_agent): UserAgent,
RemoteIpAddr(ip_addr): RemoteIpAddr,
mut body: Json<WebrtcConnectRequest>,
) -> Result<Json<Response<WebrtcSdp>>> {
) -> Result<Json<Response<WebrtcSdp, String>>> {
let string_zip = StringCompression::default();
log::info!("[HttpApis] create Webrtc endpoint {}/{}", body.0.room, body.0.peer);
if let Some(sdp) = body.0.sdp.take() {
Expand All @@ -259,11 +247,7 @@ impl GatewayHttpApis {
body.0.user_agent = user_agent;

if body.0.verify(data.1.verifier().deref()).is_none() {
return Ok(Json(Response {
status: false,
error: Some("INVALID_TOKEN".to_string()),
data: None,
}));
return Ok(Json(Response::error("INVALID_TOKEN".to_string(), "INVALID_TOKEN")));
}

let (req, rx) = RpcReqResHttp::<WebrtcConnectRequest, WebrtcConnectResponse>::new(body.0);
Expand All @@ -279,20 +263,16 @@ impl GatewayHttpApis {
_ => Err(poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR)),
}?;
log::info!("[HttpApis] Webrtc endpoint created with conn_id {}", res.conn_id);
Ok(Json(Response {
status: true,
error: None,
data: Some(WebrtcSdp {
node_id: 0,
conn_id: res.conn_id,
sdp,
}),
}))
Ok(Json(Response::success(WebrtcSdp {
node_id: 0,
conn_id: res.conn_id,
sdp,
})))
}

/// sending remote ice candidate
#[oai(path = "/webrtc/ice_remote", method = "post")]
async fn webrtc_ice_remote(&self, Data(data): Data<&DataContainer>, body: Json<WebrtcRemoteIceRequest>) -> Result<Json<Response<String>>> {
async fn webrtc_ice_remote(&self, Data(data): Data<&DataContainer>, body: Json<WebrtcRemoteIceRequest>) -> Result<Json<Response<String, String>>> {
log::info!("[HttpApis] on Webrtc endpoint ice-remote {}", body.0.candidate);
let (req, rx) = RpcReqResHttp::<WebrtcRemoteIceRequest, WebrtcRemoteIceResponse>::new(body.0);
data.0
Expand All @@ -301,16 +281,12 @@ impl GatewayHttpApis {
.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
Ok(Json(Response {
status: true,
error: None,
data: Some("OK".to_string()),
}))
Ok(Json(Response::success("OK".to_string())))
}

/// delete webrtc conn
#[oai(path = "/webrtc/conn/:conn_id", method = "delete")]
async fn conn_webrtc_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path<String>) -> Result<Json<Response<String>>> {
async fn conn_webrtc_delete(&self, Data(data): Data<&DataContainer>, conn_id: Path<String>) -> Result<Json<Response<String, String>>> {
log::info!("[HttpApis] close webrtc endpoint conn {}", conn_id.0);
let (req, rx) = RpcReqResHttp::<MediaEndpointCloseRequest, MediaEndpointCloseResponse>::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() });
data.0
Expand All @@ -320,10 +296,6 @@ impl GatewayHttpApis {
let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[HttpApis] Webrtc endpoint closed conn {}", conn_id.0);
Ok(Json(Response {
status: true,
error: None,
data: Some("OK".to_string()),
}))
Ok(Json(Response::success("OK".to_string())))
}
}
16 changes: 4 additions & 12 deletions servers/media-server/src/server/rtmp/rpc/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ pub struct RtmpHttpApis;
impl RtmpHttpApis {
/// get node health
#[oai(path = "/health", method = "get")]
async fn health(&self, Data(_ctx): Data<&(Sender<RpcEvent>, MediaServerContext<()>)>) -> Result<Json<Response<String>>> {
Ok(Json(Response {
status: true,
error: None,
data: Some("OK".to_string()),
}))
async fn health(&self, Data(_ctx): Data<&(Sender<RpcEvent>, MediaServerContext<()>)>) -> Result<Json<Response<String, String>>> {
Ok(Json(Response::success("OK")))
}

/// delete Rtmp conn
#[oai(path = "/rtmp/conn/:conn_id", method = "delete")]
async fn conn_rtmp_delete(&self, Data(ctx): Data<&(Sender<RpcEvent>, MediaServerContext<()>)>, conn_id: Path<String>) -> Result<Json<Response<String>>> {
async fn conn_rtmp_delete(&self, Data(ctx): Data<&(Sender<RpcEvent>, MediaServerContext<()>)>, conn_id: Path<String>) -> Result<Json<Response<String, String>>> {
log::info!("[HttpApis] close Rtmp endpoint conn {}", conn_id.0);
let (req, rx) = RpcReqResHttp::<MediaEndpointCloseRequest, MediaEndpointCloseResponse>::new(MediaEndpointCloseRequest { conn_id: conn_id.0.clone() });
ctx.0
Expand All @@ -40,10 +36,6 @@ impl RtmpHttpApis {
let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
let _res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[HttpApis] Rtmp endpoint closed conn {}", conn_id.0);
Ok(Json(Response {
status: true,
error: None,
data: Some("OK".to_string()),
}))
Ok(Json(Response::success("OK")))
}
}
28 changes: 10 additions & 18 deletions servers/media-server/src/server/sip/rpc/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ pub struct SipHttpApis;
impl SipHttpApis {
/// get node health
#[oai(path = "/health", method = "get")]
async fn health(&self, Data(_ctx): Data<&(Sender<RpcEvent>, MediaServerContext<()>)>) -> Result<Json<Response<String>>> {
Ok(Json(Response {
status: true,
error: None,
data: Some("OK".to_string()),
}))
async fn health(&self, Data(_ctx): Data<&(Sender<RpcEvent>, MediaServerContext<()>)>) -> Result<Json<Response<String, String>>> {
Ok(Json(Response::success("OK")))
}

/// invite a client
Expand All @@ -32,7 +28,7 @@ impl SipHttpApis {
&self,
Data(ctx): Data<&(Sender<RpcEvent>, MediaServerContext<InternalControl>)>,
body: Json<SipOutgoingInviteClientRequest>,
) -> Result<Json<Response<SipOutgoingInviteResponse>>> {
) -> Result<Json<Response<SipOutgoingInviteResponse, String>>> {
log::info!("[HttpApis] invite sip client {:?}", body.0);
let (req, rx) = RpcReqResHttp::new(body.0);
ctx.0
Expand All @@ -41,16 +37,16 @@ impl SipHttpApis {
.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
let res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
Ok(Json(Response {
status: true,
error: None,
data: Some(res),
}))
Ok(Json(Response::success(res)))
}

/// invite a server
#[oai(path = "/sip/invite/server", method = "post")]
async fn invite_server(&self, Data(ctx): Data<&(Sender<RpcEvent>, MediaServerContext<()>)>, body: Json<SipOutgoingInviteServerRequest>) -> Result<Json<Response<SipOutgoingInviteResponse>>> {
async fn invite_server(
&self,
Data(ctx): Data<&(Sender<RpcEvent>, MediaServerContext<()>)>,
body: Json<SipOutgoingInviteServerRequest>,
) -> Result<Json<Response<SipOutgoingInviteResponse, String>>> {
log::info!("[HttpApis] invite sip client {:?}", body.0);
let (req, rx) = RpcReqResHttp::new(body.0);
ctx.0
Expand All @@ -59,10 +55,6 @@ impl SipHttpApis {
.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.recv().await.map_err(|e| poem::Error::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
let res = res.map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
Ok(Json(Response {
status: true,
error: None,
data: Some(res),
}))
Ok(Json(Response::success(res)))
}
}
Loading

0 comments on commit 5be4c50

Please sign in to comment.