Skip to content

Commit

Permalink
feat: multi tenancy (#433)
Browse files Browse the repository at this point in the history
* WIP: works app_sync logic, cluster with multi-tennancy by custom ClusterRoomHash generator

* fix clippy

* WIP: record with app

* fix clippy warns

* WIP: check app is same or not when re-join

* handle app record, more app and record info to console APIs

* update frontend

* fixed: wrong migration script cause postgresql query error

* fix: rtpengine create_answer error with sdp without connection line

* fix sendtry init

* feat: app with hook

* fix warn
  • Loading branch information
giangndm authored Oct 26, 2024
1 parent 27d88bc commit 9505691
Show file tree
Hide file tree
Showing 84 changed files with 2,173 additions and 1,080 deletions.
860 changes: 617 additions & 243 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"packages/media_connector",
"packages/media_record",
"packages/media_codecs",
"packages/multi_tenancy",
"packages/rtpengine_ngcontrol",
]

Expand All @@ -34,3 +35,5 @@ rand = "0.8"
mockall = "0.13"
prost = "0.13"
indexmap = "2.2"
spin = "0.9"
httpmock = "0.7"
2 changes: 1 addition & 1 deletion bin/CONSOLE_FRONTEND
Original file line number Diff line number Diff line change
@@ -1 +1 @@
bf3c93232dcc3c456619305dc61714d7a6741af5
023efcb0a308e5e3df1849f0deee3400912325a0
5 changes: 3 additions & 2 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ media-server-gateway = { path = "../packages/media_gateway", optional = true }
media-server-connector = { path = "../packages/media_connector", optional = true }
media-server-record = { path = "../packages/media_record", default-features=false, optional = true }
media-server-utils = { path = "../packages/media_utils", optional = true }
media-server-multi-tenancy = { path = "../packages/multi_tenancy", optional = true }
rtpengine-ngcontrol = { path = "../packages/rtpengine_ngcontrol", optional = true }
local-ip-address = "0.6"
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -41,10 +42,10 @@ sentry = "0.34"

[features]
default = ["console", "gateway", "media", "connector", "cert_utils"]
gateway = ["media-server-gateway", "media-server-connector", "quinn_vnet", "node_metrics", "maxminddb", "rust-embed"]
gateway = ["media-server-gateway", "media-server-connector", "quinn_vnet", "node_metrics", "maxminddb", "rust-embed", "media-server-multi-tenancy"]
media = ["media-server-runner", "media-server-record", "quinn_vnet", "node_metrics", "rtpengine-ngcontrol"]
console = []
connector = ["quinn_vnet", "media-server-connector", "media-server-utils"]
connector = ["quinn_vnet", "media-server-connector", "media-server-utils", "media-server-multi-tenancy"]
cert_utils = ["rcgen", "rustls"]
quinn_vnet = ["rustls", "quinn"]
node_metrics = ["sysinfo"]
Expand Down
6 changes: 4 additions & 2 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use tokio::sync::mpsc::Sender;
#[cfg(feature = "embed_static")]
use utils::EmbeddedFilesEndpoint;

mod api_connector;
mod api_console;
mod api_media;
mod api_token;
Expand Down Expand Up @@ -69,6 +68,8 @@ pub async fn run_console_http_server(
storage: crate::server::console_storage::StorageShared,
connector: MediaConnectorServiceClient<SocketAddr, QuinnClient, QuinnStream>,
) -> Result<(), Box<dyn std::error::Error>> {
use poem::middleware::Tracing;

let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_ui = user_service.swagger_ui();
let user_spec = user_service.spec();
Expand Down Expand Up @@ -102,7 +103,8 @@ pub async fn run_console_http_server(
.nest("/api/connector/", connector_service.data(ctx.clone()))
.nest("/api/connector/ui", connector_ui)
.at("/api/connector/spec", poem::endpoint::make_sync(move |_| connector_spec.clone()))
.with(Cors::new());
.with(Cors::new())
.with(Tracing);

Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?;
Ok(())
Expand Down
1 change: 0 additions & 1 deletion bin/src/http/api_connector.rs

This file was deleted.

6 changes: 6 additions & 0 deletions bin/src/http/api_console/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use poem_openapi::{
#[derive(poem_openapi::Object)]
pub struct RoomInfo {
pub id: i32,
pub app: String,
pub room: String,
pub created_at: u64,
pub peers: usize,
pub record: Option<String>,
}

#[derive(poem_openapi::Object)]
Expand Down Expand Up @@ -47,6 +49,7 @@ pub struct PeerInfo {
pub struct SessionInfo {
/// u64 cause wrong parse in js, so we convert it to string
pub id: String,
pub app: String,
pub ip: Option<String>,
pub user_agent: Option<String>,
pub sdk: Option<String>,
Expand Down Expand Up @@ -81,9 +84,11 @@ impl Apis {
.into_iter()
.map(|e| RoomInfo {
id: e.id,
app: e.app,
room: e.room,
created_at: e.created_at,
peers: e.peers as usize,
record: e.record,
})
.collect::<Vec<_>>(),
),
Expand Down Expand Up @@ -172,6 +177,7 @@ impl Apis {
.into_iter()
.map(|p| SessionInfo {
id: p.id.to_string(),
app: p.app,
ip: p.ip,
user_agent: p.user_agent,
sdk: p.sdk,
Expand Down
2 changes: 1 addition & 1 deletion bin/src/http/api_console/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Apis {
/// login with user credentials
#[oai(path = "/user/login", method = "post")]
async fn user_login(&self, Data(ctx): Data<&ConsoleApisCtx>, body: Json<UserLoginReq>) -> Json<Response<UserLoginRes>> {
if ctx.secure.validate_secert(&body.secret) {
if ctx.secure.validate_secret(&body.secret) {
Json(Response {
status: true,
data: Some(UserLoginRes { token: ctx.secure.generate_token() }),
Expand Down
14 changes: 5 additions & 9 deletions bin/src/http/api_media/rtpengine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use media_server_protocol::{
cluster::gen_cluster_session_id,
endpoint::ClusterConnId,
tokens::{RtpEngineToken, RTPENGINE_TOKEN},
tokens::RtpEngineToken,
transport::{
rtpengine::{self, RtpCreateAnswerRequest, RtpCreateOfferRequest, RtpSetAnswerRequest},
RpcReq, RpcRes, RpcResult,
Expand Down Expand Up @@ -35,12 +35,10 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> RtpengineApis<S> {
#[oai(path = "/offer", method = "post")]
async fn create_offer(&self, RemoteIpAddr(ip_addr): RemoteIpAddr, TokenAuthorization(token): TokenAuthorization) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = self
.secure
.decode_obj::<RtpEngineToken>(RTPENGINE_TOKEN, &token.token)
.ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<RtpEngineToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create rtpengine endpoint with token {token:?}, ip {ip_addr}");
let (req, rx) = Rpc::new(RpcReq::RtpEngine(rtpengine::RpcReq::CreateOffer(RtpCreateOfferRequest {
app: app_ctx,
session_id,
room: token.room.into(),
peer: token.peer.into(),
Expand Down Expand Up @@ -77,12 +75,10 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> RtpengineApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = self
.secure
.decode_obj::<RtpEngineToken>(RTPENGINE_TOKEN, &token.token)
.ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<RtpEngineToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create rtpengine endpoint with token {token:?}, ip {ip_addr}");
let (req, rx) = Rpc::new(RpcReq::RtpEngine(rtpengine::RpcReq::CreateAnswer(RtpCreateAnswerRequest {
app: app_ctx,
session_id,
sdp: body.0,
room: token.room.into(),
Expand Down
12 changes: 8 additions & 4 deletions bin/src/http/api_media/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WebrtcApis<S> {
connect: Protobuf<ConnectRequest>,
) -> Result<HttpResponse<Protobuf<ConnectResponse>>> {
let session_id = gen_cluster_session_id();
let token = self.secure.decode_obj::<WebrtcToken>("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<WebrtcToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create webrtc with token {:?}, ip {}, user_agent {}, request {:?}", token, ip_addr, user_agent, connect);
if let Some(join) = &connect.join {
if token.room != Some(join.room.clone()) {
Expand All @@ -47,7 +47,9 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WebrtcApis<S> {
return Err(poem::Error::from_string("Wrong peer".to_string(), StatusCode::FORBIDDEN));
}
}
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.extra_data, token.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(
app_ctx, session_id, ip_addr, user_agent, connect.0, token.extra_data, token.record,
)));
self.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
Expand Down Expand Up @@ -104,7 +106,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WebrtcApis<S> {
connect: Protobuf<ConnectRequest>,
) -> Result<HttpResponse<Protobuf<ConnectResponse>>> {
let conn_id2 = conn_id.0.parse().map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token = self.secure.decode_obj::<WebrtcToken>("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<WebrtcToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(join) = &connect.join {
if token.room != Some(join.room.clone()) {
return Err(poem::Error::from_string("Wrong room".to_string(), StatusCode::FORBIDDEN));
Expand All @@ -115,7 +117,9 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WebrtcApis<S> {
}
}
log::info!("[MediaAPIs] restart_ice webrtc, ip {}, user_agent {}, conn {}, request {:?}", ip_addr, user_agent, conn_id.0, connect);
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.extra_data, token.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(
conn_id2, app_ctx, ip_addr, user_agent, connect.0, token.extra_data, token.record,
)));
self.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
Expand Down
3 changes: 2 additions & 1 deletion bin/src/http/api_media/whep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WhepApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = self.secure.decode_obj::<WhepToken>("whep", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<WhepToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create whep endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whep(whep::RpcReq::Connect(WhepConnectReq {
app: app_ctx,
session_id,
ip: ip_addr,
sdp: body.0,
Expand Down
3 changes: 2 additions & 1 deletion bin/src/http/api_media/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WhipApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = self.secure.decode_obj::<WhipToken>("whip", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<WhipToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create whip endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whip(whip::RpcReq::Connect(WhipConnectReq {
app: app_ctx,
session_id,
ip: ip_addr,
sdp: body.0,
Expand Down
26 changes: 13 additions & 13 deletions bin/src/http/api_token.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{marker::PhantomData, sync::Arc};

use super::{utils::TokenAuthorization, Response};
use media_server_protocol::tokens::{RtpEngineToken, WebrtcToken, WhepToken, WhipToken, RTPENGINE_TOKEN, WEBRTC_TOKEN, WHEP_TOKEN, WHIP_TOKEN};
use media_server_protocol::tokens::{RtpEngineToken, WebrtcToken, WhepToken, WhipToken};
use media_server_secure::MediaGatewaySecure;
use poem::{web::Data, Result};
use poem_openapi::{payload::Json, OpenApi};
Expand Down Expand Up @@ -87,13 +87,13 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
/// create whip session token
#[oai(path = "/whip", method = "post")]
async fn whip_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<WhipTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Result<Json<Response<WhipTokenRes>>> {
if ctx.secure.validate_app(&token.token) {
if let Some(app_ctx) = ctx.secure.validate_app(&token.token) {
let body = body.0;
Ok(Json(Response {
status: true,
data: Some(WhipTokenRes {
token: ctx.secure.encode_obj(
WHIP_TOKEN,
token: ctx.secure.encode_token(
&app_ctx,
WhipToken {
room: body.room,
peer: body.peer,
Expand All @@ -117,13 +117,13 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
/// create whep session token
#[oai(path = "/whep", method = "post")]
async fn whep_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<WhepTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Json<Response<WhepTokenRes>> {
if ctx.secure.validate_app(&token.token) {
if let Some(app_ctx) = ctx.secure.validate_app(&token.token) {
let body = body.0;
Json(Response {
status: true,
data: Some(WhepTokenRes {
token: ctx.secure.encode_obj(
WHEP_TOKEN,
token: ctx.secure.encode_token(
&app_ctx,
WhepToken {
room: body.room,
peer: body.peer,
Expand All @@ -145,13 +145,13 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {

#[oai(path = "/webrtc", method = "post")]
async fn webrtc_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<WebrtcTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Json<Response<WebrtcTokenRes>> {
if ctx.secure.validate_app(&token.token) {
if let Some(app_ctx) = ctx.secure.validate_app(&token.token) {
let body = body.0;
Json(Response {
status: true,
data: Some(WebrtcTokenRes {
token: ctx.secure.encode_obj(
WEBRTC_TOKEN,
token: ctx.secure.encode_token(
&app_ctx,
WebrtcToken {
room: body.room,
peer: body.peer,
Expand All @@ -175,13 +175,13 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
/// create rtpengine session token
#[oai(path = "/rtpengine", method = "post")]
async fn rtpengine_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<RtpEngineTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Result<Json<Response<RtpEngineTokenRes>>> {
if ctx.secure.validate_app(&token.token) {
if let Some(app_ctx) = ctx.secure.validate_app(&token.token) {
let body = body.0;
Ok(Json(Response {
status: true,
data: Some(RtpEngineTokenRes {
token: ctx.secure.encode_obj(
RTPENGINE_TOKEN,
token: ctx.secure.encode_token(
&app_ctx,
RtpEngineToken {
room: body.room,
peer: body.peer,
Expand Down
8 changes: 4 additions & 4 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ async fn main() {

assert!(args.sdn_zone_id < MAX_ZONE_ID, "sdn_zone_id must < {MAX_ZONE_ID}");

if let Some(sentry_endpoint) = args.sentry_endpoint {
let _guard = sentry::init((
let _guard = args.sentry_endpoint.map(|sentry_endpoint| {
sentry::init((
sentry_endpoint.as_str(),
sentry::ClientOptions {
release: sentry::release_name!(),
..Default::default()
},
));
}
))
});

let http_port = args.http_port;
let sdn_port = if args.sdn_port > 0 {
Expand Down
11 changes: 6 additions & 5 deletions bin/src/ng_controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use media_server_protocol::cluster::gen_cluster_session_id;
use media_server_protocol::endpoint::{ClusterConnId, PeerId, RoomId};
use media_server_protocol::tokens::{RtpEngineToken, RTPENGINE_TOKEN};
use media_server_protocol::endpoint::ClusterConnId;
use media_server_protocol::tokens::RtpEngineToken;
use media_server_protocol::transport::{rtpengine, RpcReq, RpcRes};
use media_server_secure::MediaEdgeSecure;
use media_server_utils::select2;
Expand Down Expand Up @@ -48,12 +48,13 @@ impl<T: NgTransport, S: 'static + MediaEdgeSecure> NgControllerServer<T, S> {
return Some(());
}
NgCommand::Offer { ref sdp, ref atm0s_token, .. } | NgCommand::Answer { ref sdp, ref atm0s_token, .. } => {
if let Some(token) = self.secure.decode_obj::<RtpEngineToken>(RTPENGINE_TOKEN, atm0s_token) {
if let Some((app_ctx, token)) = self.secure.decode_token::<RtpEngineToken>(atm0s_token) {
let session_id = gen_cluster_session_id();
rtpengine::RpcReq::CreateAnswer(rtpengine::RtpCreateAnswerRequest {
app: app_ctx,
session_id,
room: RoomId(token.room),
peer: PeerId(token.peer),
room: token.room.into(),
peer: token.peer.into(),
sdp: sdp.clone(),
record: token.record,
extra_data: token.extra_data,
Expand Down
19 changes: 18 additions & 1 deletion bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use media_server_connector::{
handler_service::{self, ConnectorHandlerServiceBuilder},
ConnectorCfg, ConnectorStorage, HookBodyType, HANDLER_SERVICE_ID,
};
use media_server_multi_tenancy::{MultiTenancyStorage, MultiTenancySync};
use media_server_protocol::{
cluster::{ClusterNodeGenericInfo, ClusterNodeInfo},
connector::CONNECTOR_RPC_PORT,
Expand Down Expand Up @@ -70,17 +71,33 @@ pub struct Args {
/// This is used for clearing ended room
#[arg(env, long, default_value_t = 60_000)]
storage_tick_interval_ms: u64,

/// multi-tenancy sync endpoint
#[arg(env, long)]
multi_tenancy_sync: Option<String>,

/// multi-tenancy sync endpoint
#[arg(env, long, default_value_t = 30_000)]
multi_tenancy_sync_interval_ms: u64,
}

pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let app_storage = Arc::new(MultiTenancyStorage::new("not-use-this", None));
if let Some(url) = args.multi_tenancy_sync {
let mut app_sync = MultiTenancySync::new(app_storage.clone(), &url, Duration::from_millis(args.multi_tenancy_sync_interval_ms));
tokio::spawn(async move {
app_sync.run_loop().await;
});
}

let mut connector_storage = ConnectorStorage::new(
node.node_id,
app_storage,
ConnectorCfg {
sql_uri: args.db_uri,
s3_uri: args.s3_uri,
hook_url: args.hook_uri,
hook_workers: args.hook_workers,
hook_body_type: args.hook_body_type,
room_destroy_after_ms: args.destroy_room_after_ms,
Expand Down
Loading

0 comments on commit 9505691

Please sign in to comment.