Skip to content

Commit

Permalink
feat: external auth
Browse files Browse the repository at this point in the history
  • Loading branch information
luongngocminh committed Aug 21, 2024
1 parent 260ff03 commit 5ad190e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ sysinfo = { version = "0.30", optional = true }
hex = { version = "0.4", optional = true }
mime_guess = { version = "2.0", optional = true }
sentry = "0.34"
reqwest = { version = "0.12" }

[features]
default = ["console", "gateway", "media", "connector", "cert_utils"]
Expand Down
20 changes: 18 additions & 2 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Arc<GS>,
ext_auth_uri: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let token_service: OpenApiService<_, ()> = OpenApiService::new(api_token::TokenApis::<GS>::new(), "App APIs", env!("CARGO_PKG_VERSION")).server("/token/");
let token_ui = token_service.swagger_ui();
Expand All @@ -132,7 +133,14 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
.nest("/token/", token_service.data(api_token::TokenServerCtx { secure: gateway_secure }))
.nest("/token/ui", token_ui)
.at("/token/spec", poem::endpoint::make_sync(move |_| token_spec.clone()))
.nest("/", media_service.data(api_media::MediaServerCtx { sender, secure: edge_secure }))
.nest(
"/",
media_service.data(api_media::MediaServerCtx {
sender,
secure: edge_secure,
ext_auth_uri,
}),
)
.nest("/ui", media_ui)
.at("/spec", poem::endpoint::make_sync(move |_| media_spec.clone()))
.with(Cors::new());
Expand All @@ -147,6 +155,7 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Option<Arc<GS>>,
ext_auth_uri: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut route = Route::new();

Expand All @@ -170,7 +179,14 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,

let route = route
.nest("/samples", samples)
.nest("/", media_service.data(api_media::MediaServerCtx { sender, secure: edge_secure }))
.nest(
"/",
media_service.data(api_media::MediaServerCtx {
sender,
secure: edge_secure,
ext_auth_uri,
}),
)
.nest("/ui", media_ui)
.at("/spec", poem::endpoint::make_sync(move |_| media_spec.clone()))
.with(Cors::new());
Expand Down
77 changes: 72 additions & 5 deletions bin/src/http/api_media.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::Arc, time::Duration};

use media_server_protocol::{
cluster::gen_cluster_session_id,
Expand Down Expand Up @@ -28,13 +28,15 @@ use super::utils::{ApplicationSdp, ApplicationSdpPatch, CustomHttpResponse, Prot
pub struct MediaServerCtx<S: MediaEdgeSecure + Send + Sync> {
pub(crate) sender: tokio::sync::mpsc::Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
pub(crate) secure: Arc<S>,
pub(crate) ext_auth_uri: Option<String>,
}

impl<S: MediaEdgeSecure + Send + Sync> Clone for MediaServerCtx<S> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
secure: self.secure.clone(),
ext_auth_uri: self.ext_auth_uri.clone(),
}
}
}
Expand All @@ -60,7 +62,22 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = ctx.secure.decode_obj::<WhipToken>("whip", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token_str = &token.token;
let token = ctx.secure.decode_obj::<WhipToken>("whip", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(auth_uri) = &ctx.ext_auth_uri {
log::info!(
"[MediaAPIs][whip/create] trying to authenticate peer {} for room {} using external URI provided: {}",
token.peer,
token.room,
auth_uri
);
let client = reqwest::Client::new();
let res = client.get(format!("{}/whip/create", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await;
if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) {
log::info!("[MediaAPIs] failed to authenticate user");
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
}
}
log::info!("[MediaAPIs] create whip endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whip(whip::RpcReq::Connect(WhipConnectReq {
session_id,
Expand Down Expand Up @@ -158,7 +175,22 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = ctx.secure.decode_obj::<WhepToken>("whep", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token_str = &token.token;
let token = ctx.secure.decode_obj::<WhepToken>("whep", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(auth_uri) = &ctx.ext_auth_uri {
log::info!(
"[MediaAPIs][whep/create] trying to authenticate peer {:?} for room {} using external URI provided: {}",
token.peer,
token.room,
auth_uri
);
let client = reqwest::Client::new();
let res = client.get(format!("{}/whep/create", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await;
if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) {
log::info!("[MediaAPIs] failed to authenticate user");
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
}
}
log::info!("[MediaAPIs] create whep endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whep(whep::RpcReq::Connect(WhepConnectReq {
session_id,
Expand Down Expand Up @@ -255,7 +287,22 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
connect: Protobuf<ConnectRequest>,
) -> Result<HttpResponse<Protobuf<ConnectResponse>>> {
let session_id = gen_cluster_session_id();
let token = ctx.secure.decode_obj::<WebrtcToken>("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token_str = &token.token;
let token = ctx.secure.decode_obj::<WebrtcToken>("webrtc", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(auth_uri) = &ctx.ext_auth_uri {
log::info!(
"[MediaAPIs][webrtc/connect] trying to authenticate peer {:?} for room {:?} using external URI provided: {}",
token.peer,
token.room,
auth_uri
);
let client = reqwest::Client::new();
let res = client.get(format!("{}/webrtc/connect", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await;
if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) {
log::info!("[MediaAPIs] failed to authenticate user");
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
}
}
log::info!("[MediaAPIs] create webrtc with token {:?}, ip {}, user_agent {}, request {:?}", token, ip_addr, user_agent, connect);
if let Some(join) = &connect.join {
if token.room != Some(join.room.clone()) {
Expand Down Expand Up @@ -324,7 +371,27 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
connect: Protobuf<ConnectRequest>,
) -> Result<HttpResponse<Protobuf<ConnectResponse>>> {
let conn_id2 = conn_id.0.parse().map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token = ctx.secure.decode_obj::<WebrtcToken>("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token_str = &token.token;
let token = ctx.secure.decode_obj::<WebrtcToken>("webrtc", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(auth_uri) = &ctx.ext_auth_uri {
log::info!(
"[MediaAPIs][webrtc/restart-ice] trying to authenticate peer {:?} for room {:?} using external URI provided: {}",
token.peer,
token.room,
auth_uri
);
let client = reqwest::Client::new();
let res = client
.get(format!("{}/webrtc/restart-ice", auth_uri))
.timeout(Duration::from_secs(5))
.bearer_auth(token_str)
.send()
.await;
if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) {
log::info!("[MediaAPIs] failed to authenticate user");
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
}
}
if let Some(join) = &connect.join {
if token.room != Some(join.room.clone()) {
return Err(poem::Error::from_string("Wrong room".to_string(), StatusCode::FORBIDDEN));
Expand Down
7 changes: 6 additions & 1 deletion bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ pub struct Args {
/// The port for binding the RTPengine command UDP socket.
#[arg(env, long)]
rtpengine_cmd_addr: Option<SocketAddr>,

/// External HTTP endpoint URI for third-party authentication and authorization.
/// The URI should be in the format of `http(s)://<host>:<port>` or `http(s)://example.com`, without the backslash at the end.
#[arg(env, long)]
ext_auth_uri: Option<String>,
}

pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: NodeConfig, args: Args) {
Expand All @@ -96,7 +101,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
let req_tx = req_tx.clone();
let secure2 = edge_secure.clone();
tokio::spawn(async move {
if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure).await {
if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure, args.ext_auth_uri).await {
log::error!("HTTP Error: {}", e);
}
});
Expand Down
7 changes: 6 additions & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ pub struct Args {
/// Number of workers for uploading recordings.
#[arg(env, long, default_value_t = 5)]
record_upload_worker: usize,

/// External HTTP endpoint URI for third-party authentication and authorization.
/// The URI should be in the format of `http(s)://<host>:<port>` or `http(s)://example.com`, without the backslash at the end.
#[arg(env, long)]
ext_auth_uri: Option<String>,
}

pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: NodeConfig, args: Args) {
Expand All @@ -96,7 +101,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
let req_tx = req_tx.clone();
let secure = secure.clone();
tokio::spawn(async move {
if let Err(e) = run_media_http_server(http_port, req_tx, secure, secure2).await {
if let Err(e) = run_media_http_server(http_port, req_tx, secure, secure2, args.ext_auth_uri).await {
log::error!("HTTP Error: {}", e);
}
});
Expand Down

0 comments on commit 5ad190e

Please sign in to comment.