Skip to content

Commit

Permalink
feat: standlone server (8xFF#454)
Browse files Browse the repository at this point in the history
* add option to start media node without gateway and connector agent

* simple standalone server with: console, gateway, medias, connector

* add docs
  • Loading branch information
giangndm authored Nov 18, 2024
1 parent ded83e0 commit 5cae64f
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 52 deletions.
3 changes: 2 additions & 1 deletion bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ mime_guess = { version = "2.0", optional = true }
sentry = "0.34"

[features]
default = ["console", "gateway", "media", "connector", "cert_utils"]
default = ["console", "gateway", "media", "connector", "standalone", "cert_utils"]
standalone = ["console", "gateway", "media", "connector"]
gateway = ["media-server-gateway", "media-server-connector", "quinn_vnet", "node_metrics", "maxminddb", "rust-embed", "media-server-multi-tenancy"]
media = ["media-server-runner", "media-server-record", "quinn_vnet", "node_metrics", "rtpengine-ngcontrol"]
console = []
Expand Down
11 changes: 11 additions & 0 deletions bin/media_single.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
RUST_LOG=atm0s_sdn_network=error,info \
RUST_BACKTRACE=1 \
cargo run -- \
--sdn-zone-id 0 \
--sdn-zone-node-id 1 \
--workers 1 \
--http-port 3000 \
media \
--enable-token-api \
--disable-gateway-agent \
--disable-connector-agent
3 changes: 3 additions & 0 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct Args {

#[tokio::main(flavor = "current_thread")]
async fn main() {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");
if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "info");
}
Expand Down Expand Up @@ -147,6 +148,8 @@ async fn main() {
log::error!("create cert error {:?}", e);
}
}
#[cfg(feature = "standalone")]
server::ServerType::Standalone(args) => server::run_standalone(workers, node, args).await,
}
})
.await;
Expand Down
14 changes: 10 additions & 4 deletions bin/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use clap::Subcommand;
#[cfg(feature = "cert_utils")]
mod cert;
#[cfg(feature = "connector")]
mod connector;
pub mod connector;
#[cfg(feature = "console")]
mod console;
pub mod console;
#[cfg(feature = "gateway")]
mod gateway;
pub mod gateway;
#[cfg(feature = "media")]
mod media;
pub mod media;
#[cfg(feature = "standalone")]
pub mod standalone;

#[cfg(feature = "cert_utils")]
pub use cert::run_cert_utils;
Expand All @@ -21,6 +23,8 @@ pub use console::{run_console_server, storage as console_storage};
pub use gateway::run_media_gateway;
#[cfg(feature = "media")]
pub use media::run_media_server;
#[cfg(feature = "standalone")]
pub use standalone::run_standalone;

#[derive(Debug, Subcommand)]
pub enum ServerType {
Expand All @@ -34,4 +38,6 @@ pub enum ServerType {
Media(media::Args),
#[cfg(feature = "cert_utils")]
Cert(cert::Args),
#[cfg(feature = "standalone")]
Standalone(standalone::Args),
}
20 changes: 9 additions & 11 deletions bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,46 +44,44 @@ type TW = ();
pub struct Args {
/// DB Uri
#[arg(env, long, default_value = "sqlite://connector.db?mode=rwc")]
db_uri: String,
pub db_uri: String,

/// S3 Uri
#[arg(env, long, default_value = "http://user:pass@localhost:9000/bucket/path/?path_style=true")]
s3_uri: String,
pub s3_uri: String,

/// Hook Uri.
/// If set, will send hook event to this uri. example: http://localhost:8080/hook
#[arg(env, long)]
hook_uri: Option<String>,
pub hook_uri: Option<String>,

/// Hook workers
#[arg(env, long, default_value_t = 8)]
hook_workers: usize,
pub hook_workers: usize,

/// Hook body type
#[arg(env, long, default_value = "protobuf-json")]
hook_body_type: HookBodyType,
pub hook_body_type: HookBodyType,

/// Destroy room after no-one online, default is 2 minutes
#[arg(env, long, default_value_t = 120_000)]
destroy_room_after_ms: u64,
pub destroy_room_after_ms: u64,

/// Storage tick interval, default is 1 minute
/// This is used for clearing ended room
#[arg(env, long, default_value_t = 60_000)]
storage_tick_interval_ms: u64,
pub storage_tick_interval_ms: u64,

/// multi-tenancy sync endpoint
#[arg(env, long)]
multi_tenancy_sync: Option<String>,
pub multi_tenancy_sync: Option<String>,

/// multi-tenancy sync endpoint
#[arg(env, long, default_value_t = 30_000)]
multi_tenancy_sync_interval_ms: u64,
pub multi_tenancy_sync_interval_ms: u64,
}

pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let app_storage = if let Some(url) = args.multi_tenancy_sync {
let app_storage = Arc::new(MultiTenancyStorage::new());
let mut app_sync = MultiTenancySync::new(app_storage.clone(), url, Duration::from_millis(args.multi_tenancy_sync_interval_ms));
Expand Down
2 changes: 0 additions & 2 deletions bin/src/server/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type TW = ();
pub struct Args {}

pub async fn run_console_server(workers: usize, http_port: Option<u16>, node: NodeConfig, _args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let storage = StorageShared::default();

let node_id = node.node_id;
Expand Down
30 changes: 15 additions & 15 deletions bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,44 +52,42 @@ type TW = ();
pub struct Args {
/// Location latitude.
#[arg(env, long, default_value_t = 0.0)]
lat: f32,
pub lat: f32,

/// Location longitude.
#[arg(env, long, default_value_t = 0.0)]
lon: f32,
pub lon: f32,

/// Path to the GeoIP database.
#[arg(env, long, default_value = "./maxminddb-data/GeoLite2-City.mmdb")]
geo_db: String,
pub geo_db: String,

/// Maximum CPU usage (in percent) allowed for routing to a media node or gateway node.
#[arg(env, long, default_value_t = 60)]
max_cpu: u8,
pub max_cpu: u8,

/// Maximum memory usage (in percent) allowed for routing to a media node or gateway node.
#[arg(env, long, default_value_t = 80)]
max_memory: u8,
pub max_memory: u8,

/// Maximum disk usage (in percent) allowed for routing to a media node or gateway node.
#[arg(env, long, default_value_t = 90)]
max_disk: u8,
pub max_disk: u8,

/// The port for binding the RTPengine command UDP socket.
#[arg(env, long)]
rtpengine_cmd_addr: Option<SocketAddr>,
pub rtpengine_cmd_addr: Option<SocketAddr>,

/// multi-tenancy sync endpoint
#[arg(env, long)]
multi_tenancy_sync: Option<String>,
pub multi_tenancy_sync: Option<String>,

/// multi-tenancy sync endpoint
#[arg(env, long, default_value_t = 30_000)]
multi_tenancy_sync_interval_ms: u64,
pub multi_tenancy_sync_interval_ms: u64,
}

pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert");
let default_cluster_key_buf = include_bytes!("../../certs/cluster.key");
let default_cluster_cert = CertificateDer::from(default_cluster_cert_buf.to_vec());
Expand All @@ -101,13 +99,15 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
let edge_secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes()));

let app_storage = if let Some(url) = args.multi_tenancy_sync {
log::info!("[MediaGateway] multi-tenancy sync is enabled, using url: {}", url);
let app_storage = Arc::new(MultiTenancyStorage::new());
let mut app_sync = MultiTenancySync::new(app_storage.clone(), url, Duration::from_millis(args.multi_tenancy_sync_interval_ms));
tokio::spawn(async move {
app_sync.run_loop().await;
});
app_storage
} else {
log::info!("[MediaGateway] multi-tenancy sync is disabled, using single tenant with secret: {}", node.secret);
Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None))
};
let gateway_secure = MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage.clone());
Expand All @@ -130,10 +130,10 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await;
let secure2 = edge_secure.clone();
tokio::spawn(async move {
log::info!("[MediaServer] start ng_controller task");
log::info!("[MediaGateway] start ng_controller task");
let mut server = NgControllerServer::new(rtpengine_udp, secure2, req_tx);
while server.recv().await.is_some() {}
log::info!("[MediaServer] stop ng_controller task");
log::info!("[MediaGateway] stop ng_controller task");
});
}

Expand Down Expand Up @@ -248,7 +248,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod

tokio::spawn(async move {
let res = local_rpc_processor.process_req(conn_part, param).await;
res_tx.send(res).print_err2("answer http request error");
res_tx.send(res).print_err2("[MediaGateway] answer http request error");
});
}
while let Ok(control) = connector_agent_rx.try_recv() {
Expand All @@ -271,7 +271,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
},
SdnExtOut::FeaturesEvent(_, FeaturesEvent::Socket(event)) => {
if let Err(e) = vnet_tx.try_send(event) {
log::error!("[MediaEdge] forward Sdn SocketEvent error {:?}", e);
log::error!("[MediaGateway] forward Sdn SocketEvent error {:?}", e);
}
}
_ => {}
Expand Down
30 changes: 19 additions & 11 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,47 +44,53 @@ use runtime_worker::{ExtIn, ExtOut};
pub struct Args {
/// Enables the Token API, which allows token generation.
#[arg(env, long)]
enable_token_api: bool,
pub enable_token_api: bool,

/// Enables WebRTC ICE Lite mode.
#[arg(env, long)]
ice_lite: bool,
pub ice_lite: bool,

/// The seed port for binding the WebRTC UDP socket. The port will increment by one for each worker.
/// Default: 0, which assigns the port randomly.
/// If set to 20000, each worker will be assigned a unique port: worker0: 20000, worker1: 20001, worker2: 20002, ...
#[arg(env, long, default_value_t = 0)]
webrtc_port_seed: u16,
pub webrtc_port_seed: u16,

/// The port for binding the RTPengine command UDP socket.
#[arg(env, long)]
rtpengine_cmd_addr: Option<SocketAddr>,
pub rtpengine_cmd_addr: Option<SocketAddr>,

/// The IP address for RTPengine RTP listening.
/// Default: 127.0.0.1
#[arg(env, long, default_value = "127.0.0.1")]
rtpengine_rtp_ip: IpAddr,
pub rtpengine_rtp_ip: IpAddr,

/// Maximum concurrent connections per CPU core.
#[arg(env, long, default_value_t = 200)]
ccu_per_core: u32,
pub ccu_per_core: u32,

/// Directory for storing cached recordings.
#[arg(env, long, default_value = "./record_cache/")]
record_cache: String,
pub record_cache: String,

/// Maximum size of the recording cache in bytes.
#[arg(env, long, default_value_t = 100_000_000)]
record_mem_max_size: usize,
pub record_mem_max_size: usize,

/// Number of workers for uploading recordings.
#[arg(env, long, default_value_t = 5)]
record_upload_worker: usize,
pub record_upload_worker: usize,

/// Enables the Gateway Agent service.
#[arg(env, long)]
pub disable_gateway_agent: bool,

/// Enables the Connector Agent service.
#[arg(env, long)]
pub disable_connector_agent: bool,
}

pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert");
let default_cluster_key_buf = include_bytes!("../../certs/cluster.key");
let default_cluster_cert = CertificateDer::from(default_cluster_cert_buf.to_vec());
Expand Down Expand Up @@ -147,6 +153,8 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
ice_lite: args.ice_lite,
secure: secure.clone(),
max_live: HashMap::from([(ServiceKind::Webrtc, workers as u32 * args.ccu_per_core), (ServiceKind::RtpEngine, workers as u32 * args.ccu_per_core)]),
enable_gateway_agent: !args.disable_gateway_agent,
enable_connector_agent: !args.disable_connector_agent,
},
};
controller.add_worker::<_, _, MediaRuntimeWorker<_>, PollingBackend<_, 128, 512>>(Duration::from_millis(1), cfg, None);
Expand Down
Loading

0 comments on commit 5cae64f

Please sign in to comment.