From 5ad190e0ac3098057c51221f60d478d11ee17890 Mon Sep 17 00:00:00 2001 From: Luong Minh Date: Wed, 21 Aug 2024 18:15:44 +0700 Subject: [PATCH] feat: external auth --- Cargo.lock | 1 + bin/Cargo.toml | 1 + bin/src/http.rs | 20 +++++++++- bin/src/http/api_media.rs | 77 ++++++++++++++++++++++++++++++++++++--- bin/src/server/gateway.rs | 7 +++- bin/src/server/media.rs | 7 +++- 6 files changed, 104 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a69570a1..4350c44f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,6 +441,7 @@ dependencies = [ "quinn", "rand 0.8.5", "rcgen", + "reqwest", "rtpengine-ngcontrol", "rust-embed", "rustls 0.23.12", diff --git a/bin/Cargo.toml b/bin/Cargo.toml index ba237914..d65b36dd 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -38,6 +38,7 @@ sysinfo = { version = "0.30", optional = true } hex = { version = "0.4", optional = true } mime_guess = { version = "2.0", optional = true } sentry = "0.34" +reqwest = { version = "0.12" } [features] default = ["console", "gateway", "media", "connector", "cert_utils"] diff --git a/bin/src/http.rs b/bin/src/http.rs index 1eeb6774..f96dda78 100644 --- a/bin/src/http.rs +++ b/bin/src/http.rs @@ -114,6 +114,7 @@ pub async fn run_gateway_http_server, RpcRes>>, edge_secure: Arc, gateway_secure: Arc, + ext_auth_uri: Option, ) -> Result<(), Box> { let token_service: OpenApiService<_, ()> = OpenApiService::new(api_token::TokenApis::::new(), "App APIs", env!("CARGO_PKG_VERSION")).server("/token/"); let token_ui = token_service.swagger_ui(); @@ -132,7 +133,14 @@ pub async fn run_gateway_http_server, RpcRes>>, edge_secure: Arc, gateway_secure: Option>, + ext_auth_uri: Option, ) -> Result<(), Box> { let mut route = Route::new(); @@ -170,7 +179,14 @@ pub async fn run_media_http_server { pub(crate) sender: tokio::sync::mpsc::Sender, RpcRes>>, pub(crate) secure: Arc, + pub(crate) ext_auth_uri: Option, } impl Clone for MediaServerCtx { @@ -35,6 +36,7 @@ impl Clone for MediaServerCtx { Self { sender: self.sender.clone(), secure: self.secure.clone(), + ext_auth_uri: self.ext_auth_uri.clone(), } } } @@ -60,7 +62,22 @@ impl MediaApis { body: ApplicationSdp, ) -> Result>> { let session_id = gen_cluster_session_id(); - let token = ctx.secure.decode_obj::("whip", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?; + let token_str = &token.token; + let token = ctx.secure.decode_obj::("whip", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?; + if let Some(auth_uri) = &ctx.ext_auth_uri { + log::info!( + "[MediaAPIs][whip/create] trying to authenticate peer {} for room {} using external URI provided: {}", + token.peer, + token.room, + auth_uri + ); + let client = reqwest::Client::new(); + let res = client.get(format!("{}/whip/create", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await; + if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) { + log::info!("[MediaAPIs] failed to authenticate user"); + return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED)); + } + } log::info!("[MediaAPIs] create whip endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent); let (req, rx) = Rpc::new(RpcReq::Whip(whip::RpcReq::Connect(WhipConnectReq { session_id, @@ -158,7 +175,22 @@ impl MediaApis { body: ApplicationSdp, ) -> Result>> { let session_id = gen_cluster_session_id(); - let token = ctx.secure.decode_obj::("whep", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?; + let token_str = &token.token; + let token = ctx.secure.decode_obj::("whep", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?; + if let Some(auth_uri) = &ctx.ext_auth_uri { + log::info!( + "[MediaAPIs][whep/create] trying to authenticate peer {:?} for room {} using external URI provided: {}", + token.peer, + token.room, + auth_uri + ); + let client = reqwest::Client::new(); + let res = client.get(format!("{}/whep/create", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await; + if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) { + log::info!("[MediaAPIs] failed to authenticate user"); + return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED)); + } + } log::info!("[MediaAPIs] create whep endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent); let (req, rx) = Rpc::new(RpcReq::Whep(whep::RpcReq::Connect(WhepConnectReq { session_id, @@ -255,7 +287,22 @@ impl MediaApis { connect: Protobuf, ) -> Result>> { let session_id = gen_cluster_session_id(); - let token = ctx.secure.decode_obj::("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?; + let token_str = &token.token; + let token = ctx.secure.decode_obj::("webrtc", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?; + if let Some(auth_uri) = &ctx.ext_auth_uri { + log::info!( + "[MediaAPIs][webrtc/connect] trying to authenticate peer {:?} for room {:?} using external URI provided: {}", + token.peer, + token.room, + auth_uri + ); + let client = reqwest::Client::new(); + let res = client.get(format!("{}/webrtc/connect", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await; + if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) { + log::info!("[MediaAPIs] failed to authenticate user"); + return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED)); + } + } log::info!("[MediaAPIs] create webrtc with token {:?}, ip {}, user_agent {}, request {:?}", token, ip_addr, user_agent, connect); if let Some(join) = &connect.join { if token.room != Some(join.room.clone()) { @@ -324,7 +371,27 @@ impl MediaApis { connect: Protobuf, ) -> Result>> { let conn_id2 = conn_id.0.parse().map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?; - let token = ctx.secure.decode_obj::("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?; + let token_str = &token.token; + let token = ctx.secure.decode_obj::("webrtc", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?; + if let Some(auth_uri) = &ctx.ext_auth_uri { + log::info!( + "[MediaAPIs][webrtc/restart-ice] trying to authenticate peer {:?} for room {:?} using external URI provided: {}", + token.peer, + token.room, + auth_uri + ); + let client = reqwest::Client::new(); + let res = client + .get(format!("{}/webrtc/restart-ice", auth_uri)) + .timeout(Duration::from_secs(5)) + .bearer_auth(token_str) + .send() + .await; + if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) { + log::info!("[MediaAPIs] failed to authenticate user"); + return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED)); + } + } if let Some(join) = &connect.join { if token.room != Some(join.room.clone()) { return Err(poem::Error::from_string("Wrong room".to_string(), StatusCode::FORBIDDEN)); diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index fa518633..d8a41520 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -76,6 +76,11 @@ pub struct Args { /// The port for binding the RTPengine command UDP socket. #[arg(env, long)] rtpengine_cmd_addr: Option, + + /// External HTTP endpoint URI for third-party authentication and authorization. + /// The URI should be in the format of `http(s)://:` or `http(s)://example.com`, without the backslash at the end. + #[arg(env, long)] + ext_auth_uri: Option, } pub async fn run_media_gateway(workers: usize, http_port: Option, node: NodeConfig, args: Args) { @@ -96,7 +101,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let req_tx = req_tx.clone(); let secure2 = edge_secure.clone(); tokio::spawn(async move { - if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure).await { + if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure, args.ext_auth_uri).await { log::error!("HTTP Error: {}", e); } }); diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 7084045c..7ed6c2e1 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -79,6 +79,11 @@ pub struct Args { /// Number of workers for uploading recordings. #[arg(env, long, default_value_t = 5)] record_upload_worker: usize, + + /// External HTTP endpoint URI for third-party authentication and authorization. + /// The URI should be in the format of `http(s)://:` or `http(s)://example.com`, without the backslash at the end. + #[arg(env, long)] + ext_auth_uri: Option, } pub async fn run_media_server(workers: usize, http_port: Option, node: NodeConfig, args: Args) { @@ -96,7 +101,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node let req_tx = req_tx.clone(); let secure = secure.clone(); tokio::spawn(async move { - if let Err(e) = run_media_http_server(http_port, req_tx, secure, secure2).await { + if let Err(e) = run_media_http_server(http_port, req_tx, secure, secure2, args.ext_auth_uri).await { log::error!("HTTP Error: {}", e); } });