diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 3a35de4e..2e5bf73e 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -42,7 +42,8 @@ reqwest = { version = "0.12", features = ["json"]} sentry = "0.34" [features] -default = ["console", "gateway", "media", "connector", "cert_utils"] +default = ["console", "gateway", "media", "connector", "standalone", "cert_utils"] +standalone = ["console", "gateway", "media", "connector"] gateway = ["media-server-gateway", "media-server-connector", "quinn_vnet", "node_metrics", "maxminddb", "rust-embed", "media-server-multi-tenancy"] media = ["media-server-runner", "media-server-record", "quinn_vnet", "node_metrics", "rtpengine-ngcontrol"] console = [] diff --git a/bin/media_single.sh b/bin/media_single.sh new file mode 100644 index 00000000..051bb635 --- /dev/null +++ b/bin/media_single.sh @@ -0,0 +1,11 @@ +RUST_LOG=atm0s_sdn_network=error,info \ +RUST_BACKTRACE=1 \ +cargo run -- \ + --sdn-zone-id 0 \ + --sdn-zone-node-id 1 \ + --workers 1 \ + --http-port 3000 \ + media \ + --enable-token-api \ + --disable-gateway-agent \ + --disable-connector-agent diff --git a/bin/src/main.rs b/bin/src/main.rs index 773588c1..f02eded7 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -85,6 +85,7 @@ struct Args { #[tokio::main(flavor = "current_thread")] async fn main() { + rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); if std::env::var_os("RUST_LOG").is_none() { std::env::set_var("RUST_LOG", "info"); } @@ -194,6 +195,8 @@ async fn main() { log::error!("create cert error {:?}", e); } } + #[cfg(feature = "standalone")] + server::ServerType::Standalone(args) => server::run_standalone(workers, node, args).await, } }) .await; diff --git a/bin/src/server.rs b/bin/src/server.rs index f0224ff1..66847198 100644 --- a/bin/src/server.rs +++ b/bin/src/server.rs @@ -3,13 +3,15 @@ use clap::Subcommand; #[cfg(feature = "cert_utils")] mod cert; #[cfg(feature = "connector")] -mod connector; +pub mod connector; #[cfg(feature = "console")] -mod console; +pub mod console; #[cfg(feature = "gateway")] -mod gateway; +pub mod gateway; #[cfg(feature = "media")] -mod media; +pub mod media; +#[cfg(feature = "standalone")] +pub mod standalone; #[cfg(feature = "cert_utils")] pub use cert::run_cert_utils; @@ -21,6 +23,8 @@ pub use console::{run_console_server, storage as console_storage}; pub use gateway::run_media_gateway; #[cfg(feature = "media")] pub use media::run_media_server; +#[cfg(feature = "standalone")] +pub use standalone::run_standalone; #[derive(Debug, Subcommand)] pub enum ServerType { @@ -34,4 +38,6 @@ pub enum ServerType { Media(media::Args), #[cfg(feature = "cert_utils")] Cert(cert::Args), + #[cfg(feature = "standalone")] + Standalone(standalone::Args), } diff --git a/bin/src/server/connector.rs b/bin/src/server/connector.rs index 6e336ba8..d2f0f1ec 100644 --- a/bin/src/server/connector.rs +++ b/bin/src/server/connector.rs @@ -44,46 +44,44 @@ type TW = (); pub struct Args { /// DB Uri #[arg(env, long, default_value = "sqlite://connector.db?mode=rwc")] - db_uri: String, + pub db_uri: String, /// S3 Uri #[arg(env, long, default_value = "http://user:pass@localhost:9000/bucket/path/?path_style=true")] - s3_uri: String, + pub s3_uri: String, /// Hook Uri. /// If set, will send hook event to this uri. example: http://localhost:8080/hook #[arg(env, long)] - hook_uri: Option, + pub hook_uri: Option, /// Hook workers #[arg(env, long, default_value_t = 8)] - hook_workers: usize, + pub hook_workers: usize, /// Hook body type #[arg(env, long, default_value = "protobuf-json")] - hook_body_type: HookBodyType, + pub hook_body_type: HookBodyType, /// Destroy room after no-one online, default is 2 minutes #[arg(env, long, default_value_t = 120_000)] - destroy_room_after_ms: u64, + pub destroy_room_after_ms: u64, /// Storage tick interval, default is 1 minute /// This is used for clearing ended room #[arg(env, long, default_value_t = 60_000)] - storage_tick_interval_ms: u64, + pub storage_tick_interval_ms: u64, /// multi-tenancy sync endpoint #[arg(env, long)] - multi_tenancy_sync: Option, + pub multi_tenancy_sync: Option, /// multi-tenancy sync endpoint #[arg(env, long, default_value_t = 30_000)] - multi_tenancy_sync_interval_ms: u64, + pub multi_tenancy_sync_interval_ms: u64, } pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { - rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); - let app_storage = if let Some(url) = args.multi_tenancy_sync { let app_storage = Arc::new(MultiTenancyStorage::new()); let mut app_sync = MultiTenancySync::new(app_storage.clone(), url, Duration::from_millis(args.multi_tenancy_sync_interval_ms)); diff --git a/bin/src/server/console.rs b/bin/src/server/console.rs index 8d66ec85..62919b64 100644 --- a/bin/src/server/console.rs +++ b/bin/src/server/console.rs @@ -36,8 +36,6 @@ type TW = (); pub struct Args {} pub async fn run_console_server(workers: usize, http_port: Option, node: NodeConfig, _args: Args) { - rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); - let storage = StorageShared::default(); let node_id = node.node_id; diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index 08eae977..1dcc91d9 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -52,44 +52,42 @@ type TW = (); pub struct Args { /// Location latitude. #[arg(env, long, default_value_t = 0.0)] - lat: f32, + pub lat: f32, /// Location longitude. #[arg(env, long, default_value_t = 0.0)] - lon: f32, + pub lon: f32, /// Path to the GeoIP database. #[arg(env, long, default_value = "./maxminddb-data/GeoLite2-City.mmdb")] - geo_db: String, + pub geo_db: String, /// Maximum CPU usage (in percent) allowed for routing to a media node or gateway node. #[arg(env, long, default_value_t = 60)] - max_cpu: u8, + pub max_cpu: u8, /// Maximum memory usage (in percent) allowed for routing to a media node or gateway node. #[arg(env, long, default_value_t = 80)] - max_memory: u8, + pub max_memory: u8, /// Maximum disk usage (in percent) allowed for routing to a media node or gateway node. #[arg(env, long, default_value_t = 90)] - max_disk: u8, + pub max_disk: u8, /// The port for binding the RTPengine command UDP socket. #[arg(env, long)] - rtpengine_cmd_addr: Option, + pub rtpengine_cmd_addr: Option, /// multi-tenancy sync endpoint #[arg(env, long)] - multi_tenancy_sync: Option, + pub multi_tenancy_sync: Option, /// multi-tenancy sync endpoint #[arg(env, long, default_value_t = 30_000)] - multi_tenancy_sync_interval_ms: u64, + pub multi_tenancy_sync_interval_ms: u64, } pub async fn run_media_gateway(workers: usize, http_port: Option, node: NodeConfig, args: Args) { - rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); - let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert"); let default_cluster_key_buf = include_bytes!("../../certs/cluster.key"); let default_cluster_cert = CertificateDer::from(default_cluster_cert_buf.to_vec()); @@ -101,6 +99,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let edge_secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes())); let app_storage = if let Some(url) = args.multi_tenancy_sync { + log::info!("[MediaGateway] multi-tenancy sync is enabled, using url: {}", url); let app_storage = Arc::new(MultiTenancyStorage::new()); let mut app_sync = MultiTenancySync::new(app_storage.clone(), url, Duration::from_millis(args.multi_tenancy_sync_interval_ms)); tokio::spawn(async move { @@ -108,6 +107,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod }); app_storage } else { + log::info!("[MediaGateway] multi-tenancy sync is disabled, using single tenant with secret: {}", node.secret); Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None)) }; let gateway_secure = MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage.clone()); @@ -120,10 +120,10 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await; let secure2 = edge_secure.clone(); tokio::spawn(async move { - log::info!("[MediaServer] start ng_controller task"); + log::info!("[MediaGateway] start ng_controller task"); let mut server = NgControllerServer::new(rtpengine_udp, secure2, req_tx); while server.recv().await.is_some() {} - log::info!("[MediaServer] stop ng_controller task"); + log::info!("[MediaGateway] stop ng_controller task"); }); } @@ -250,7 +250,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod tokio::spawn(async move { let res = local_rpc_processor.process_req(conn_part, param).await; - res_tx.send(res).print_err2("answer http request error"); + res_tx.send(res).print_err2("[MediaGateway] answer http request error"); }); } while let Ok(control) = connector_agent_rx.try_recv() { @@ -273,7 +273,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod }, SdnExtOut::FeaturesEvent(_, FeaturesEvent::Socket(event)) => { if let Err(e) = vnet_tx.try_send(event) { - log::error!("[MediaEdge] forward Sdn SocketEvent error {:?}", e); + log::error!("[MediaGateway] forward Sdn SocketEvent error {:?}", e); } } _ => {} diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 86f22eee..bb17bb2e 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -44,47 +44,53 @@ use runtime_worker::{ExtIn, ExtOut}; pub struct Args { /// Enables the Token API, which allows token generation. #[arg(env, long)] - enable_token_api: bool, + pub enable_token_api: bool, /// Enables WebRTC ICE Lite mode. #[arg(env, long)] - ice_lite: bool, + pub ice_lite: bool, /// The seed port for binding the WebRTC UDP socket. The port will increment by one for each worker. /// Default: 0, which assigns the port randomly. /// If set to 20000, each worker will be assigned a unique port: worker0: 20000, worker1: 20001, worker2: 20002, ... #[arg(env, long, default_value_t = 0)] - webrtc_port_seed: u16, + pub webrtc_port_seed: u16, /// The port for binding the RTPengine command UDP socket. #[arg(env, long)] - rtpengine_cmd_addr: Option, + pub rtpengine_cmd_addr: Option, /// The IP address for RTPengine RTP listening. /// Default: 127.0.0.1 #[arg(env, long, default_value = "127.0.0.1")] - rtpengine_rtp_ip: IpAddr, + pub rtpengine_rtp_ip: IpAddr, /// Maximum concurrent connections per CPU core. #[arg(env, long, default_value_t = 200)] - ccu_per_core: u32, + pub ccu_per_core: u32, /// Directory for storing cached recordings. #[arg(env, long, default_value = "./record_cache/")] - record_cache: String, + pub record_cache: String, /// Maximum size of the recording cache in bytes. #[arg(env, long, default_value_t = 100_000_000)] - record_mem_max_size: usize, + pub record_mem_max_size: usize, /// Number of workers for uploading recordings. #[arg(env, long, default_value_t = 5)] - record_upload_worker: usize, + pub record_upload_worker: usize, + + /// Enables the Gateway Agent service. + #[arg(env, long)] + pub disable_gateway_agent: bool, + + /// Enables the Connector Agent service. + #[arg(env, long)] + pub disable_connector_agent: bool, } pub async fn run_media_server(workers: usize, http_port: Option, node: NodeConfig, args: Args) { - rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); - let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert"); let default_cluster_key_buf = include_bytes!("../../certs/cluster.key"); let default_cluster_cert = CertificateDer::from(default_cluster_cert_buf.to_vec()); @@ -149,6 +155,8 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node ice_lite: args.ice_lite, secure: secure.clone(), max_live: HashMap::from([(ServiceKind::Webrtc, workers as u32 * args.ccu_per_core), (ServiceKind::RtpEngine, workers as u32 * args.ccu_per_core)]), + enable_gateway_agent: !args.disable_gateway_agent, + enable_connector_agent: !args.disable_connector_agent, }, }; controller.add_worker::<_, _, MediaRuntimeWorker<_>, PollingBackend<_, 128, 512>>(Duration::from_millis(1), cfg, None); diff --git a/bin/src/server/standalone.rs b/bin/src/server/standalone.rs new file mode 100644 index 00000000..f41bf784 --- /dev/null +++ b/bin/src/server/standalone.rs @@ -0,0 +1,249 @@ +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, +}; + +use atm0s_sdn::NodeAddr; +use clap::Parser; +use media_server_connector::HookBodyType; + +use crate::NodeConfig; + +#[derive(Debug, Parser)] +pub struct Args { + /// The port for console server + #[arg(env, long, default_value_t = 8080)] + pub console_port: u16, + + /// The port for gateway server + #[arg(env, long, default_value_t = 3000)] + pub gateway_port: u16, + + /// The path to the GeoIP database + #[arg(env, long, default_value = "./maxminddb-data/GeoLite2-City.mmdb")] + pub geo_db: String, + + /// Maximum CPU usage (in percent) allowed for routing to a media node or gateway node. + #[arg(env, long, default_value_t = 60)] + pub max_cpu: u8, + + /// Maximum memory usage (in percent) allowed for routing to a media node or gateway node. + #[arg(env, long, default_value_t = 80)] + pub max_memory: u8, + + /// Maximum disk usage (in percent) allowed for routing to a media node or gateway node. + #[arg(env, long, default_value_t = 90)] + pub max_disk: u8, + + /// Multi-tenancy sync endpoint + #[arg(env, long)] + pub multi_tenancy_sync: Option, + + /// Multi-tenancy sync interval in milliseconds + #[arg(env, long, default_value_t = 30_000)] + pub multi_tenancy_sync_interval_ms: u64, + + /// Record cache directory + #[arg(env, long, default_value = "./record_cache/")] + pub record_cache: String, + + /// Maximum size of the recording cache in bytes + #[arg(env, long, default_value_t = 100_000_000)] + pub record_mem_max_size: usize, + + /// Number of workers for uploading recordings + #[arg(env, long, default_value_t = 5)] + pub record_upload_worker: usize, + + /// DB Uri + #[arg(env, long, default_value = "sqlite://connector.db?mode=rwc")] + pub db_uri: String, + + /// S3 Uri + #[arg(env, long, default_value = "http://minioadmin:minioadmin@localhost:9000/record/?path_style=true")] + pub s3_uri: String, + + /// Hook URI + #[arg(env, long)] + pub hook_uri: Option, + + /// Number of workers for hook + #[arg(env, long, default_value_t = 8)] + pub hook_workers: usize, + + /// Hook body type + #[arg(env, long, default_value = "protobuf-json")] + pub hook_body_type: HookBodyType, + + /// Destroy room after no-one online, default is 2 minutes + #[arg(env, long, default_value_t = 120_000)] + pub destroy_room_after_ms: u64, + + /// Storage tick interval, default is 1 minute + #[arg(env, long, default_value_t = 60_000)] + pub storage_tick_interval_ms: u64, + + /// The IP address for RTPengine RTP listening. + /// Default: 127.0.0.1 + #[arg(env, long, default_value = "127.0.0.1")] + pub rtpengine_rtp_ip: IpAddr, + + /// Media instance count + #[arg(env, long, default_value_t = 2)] + pub media_instance_count: u32, +} + +pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) { + log::info!("Running standalone server"); + let console_p2p_addr = { + log::info!("Running console node"); + let zone = node.zone; + let secret = node.secret.clone(); + let console_port = args.console_port; + let console_p2p_addr = get_free_socket_addr(); + tokio::task::spawn_local(async move { + super::run_console_server( + workers, + Some(console_port), + NodeConfig { + node_id: 0, + secret, + seeds: vec![], + bind_addrs: vec![console_p2p_addr], + zone, + bind_addrs_alt: vec![], + }, + super::console::Args {}, + ) + .await + }); + console_p2p_addr + }; + let gateway_p2p_addr = { + log::info!("Running gateway node"); + let zone = node.zone; + let secret = node.secret.clone(); + let gateway_port = args.gateway_port; + let gateway_p2p_addr = get_free_socket_addr(); + let multi_tenancy_sync = args.multi_tenancy_sync.clone(); + let multi_tenancy_sync_interval_ms = args.multi_tenancy_sync_interval_ms; + let geo_db = args.geo_db.clone(); + let max_cpu = args.max_cpu; + let max_memory = args.max_memory; + let max_disk = args.max_disk; + tokio::task::spawn_local(async move { + super::run_media_gateway( + workers, + Some(gateway_port), + NodeConfig { + node_id: 10, + secret, + seeds: vec![NodeAddr::from_str(&format!("0@/ip4/{}/udp/{}", console_p2p_addr.ip(), console_p2p_addr.port())).expect("Should parse node addr")], + bind_addrs: vec![gateway_p2p_addr], + zone, + bind_addrs_alt: vec![], + }, + super::gateway::Args { + lat: 0.0, + lon: 0.0, + geo_db, + max_cpu, + max_memory, + max_disk, + rtpengine_cmd_addr: None, + multi_tenancy_sync, + multi_tenancy_sync_interval_ms, + }, + ) + .await + }); + gateway_p2p_addr + }; + { + log::info!("Running connector node"); + let connector_p2p_addr = get_free_socket_addr(); + let secret = node.secret.clone(); + let zone = node.zone; + let db_uri = args.db_uri.clone(); + let s3_uri = args.s3_uri.clone(); + let hook_uri = args.hook_uri.clone(); + let hook_workers = args.hook_workers; + let hook_body_type = args.hook_body_type; + let destroy_room_after_ms = args.destroy_room_after_ms; + let storage_tick_interval_ms = args.storage_tick_interval_ms; + let multi_tenancy_sync = args.multi_tenancy_sync.clone(); + let multi_tenancy_sync_interval_ms = args.multi_tenancy_sync_interval_ms; + tokio::task::spawn_local(async move { + super::run_media_connector( + workers, + NodeConfig { + node_id: 30, + secret, + seeds: vec![NodeAddr::from_str(&format!("10@/ip4/{}/udp/{}", gateway_p2p_addr.ip(), gateway_p2p_addr.port())).expect("Should parse node addr")], + bind_addrs: vec![connector_p2p_addr], + zone, + bind_addrs_alt: vec![], + }, + super::connector::Args { + db_uri, + s3_uri, + hook_uri, + hook_workers, + hook_body_type, + destroy_room_after_ms, + storage_tick_interval_ms, + multi_tenancy_sync, + multi_tenancy_sync_interval_ms, + }, + ) + .await + }); + } + for i in 0..args.media_instance_count { + log::info!("Running media node {}", i); + let media_p2p_addr = get_free_socket_addr(); + let node_id = 20 + i; + let secret = node.secret.clone(); + let zone = node.zone; + let record_cache = args.record_cache.clone(); + let record_mem_max_size = args.record_mem_max_size; + let record_upload_worker = args.record_upload_worker; + let rtpengine_rtp_ip = args.rtpengine_rtp_ip; + tokio::task::spawn_local(async move { + super::run_media_server( + workers, + None, + NodeConfig { + node_id, + secret, + seeds: vec![NodeAddr::from_str(&format!("10@/ip4/{}/udp/{}", gateway_p2p_addr.ip(), gateway_p2p_addr.port())).expect("Should parse node addr")], + bind_addrs: vec![media_p2p_addr], + zone, + bind_addrs_alt: vec![], + }, + super::media::Args { + enable_token_api: false, + ice_lite: false, + webrtc_port_seed: 0, + rtpengine_cmd_addr: None, + rtpengine_rtp_ip, + ccu_per_core: 200, + record_cache, + record_mem_max_size, + record_upload_worker, + disable_gateway_agent: false, + disable_connector_agent: false, + }, + ) + .await + }); + } + loop { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } +} + +fn get_free_socket_addr() -> SocketAddr { + let socket = std::net::UdpSocket::bind(("127.0.0.1", 0)).expect("Should get free port"); + socket.local_addr().expect("Should get free port") +} diff --git a/bin/standalone.sh b/bin/standalone.sh new file mode 100644 index 00000000..a61d28db --- /dev/null +++ b/bin/standalone.sh @@ -0,0 +1,10 @@ +RUST_LOG=atm0s_sdn_network=error,info \ +RUST_BACKTRACE=1 \ +cargo run -- \ + --sdn-zone-node-id 1 \ + --workers 1 \ + standalone \ + --geo-db "../maxminddb-data/GeoLite2-City.mmdb" \ + --max-cpu 100 \ + --max-memory 100 \ + --max-disk 100 diff --git a/docs/getting-started/installation/README.md b/docs/getting-started/installation/README.md index 054003e9..918b92d8 100644 --- a/docs/getting-started/installation/README.md +++ b/docs/getting-started/installation/README.md @@ -28,6 +28,7 @@ cargo build --release --package atm0s-media-server Depend on your need, we have some topology to install atm0s-media-server: +- [Standalone](./standalone.md) - [Single zone](./single-zone.md) - [Multi zones](./multi-zones.md) diff --git a/docs/getting-started/installation/standalone.md b/docs/getting-started/installation/standalone.md new file mode 100644 index 00000000..6043f0ad --- /dev/null +++ b/docs/getting-started/installation/standalone.md @@ -0,0 +1,78 @@ +# Standalone Mode + +Standalone mode is designed for testing purposes, allowing you to run both a media server and gateway server on a single machine with minimal configuration. + +## Prerequisites + +Before starting, you only need to download media binary (or docker or build from source) and a GeoIP database. + +## Basic Usage + +### Prepare GeoIP database + +```bash +mkdir -p maxminddb-data +cd maxminddb-data +wget https://github.com/P3TERX/GeoLite.mmdb/raw/download/GeoLite2-City.mmdb +``` + +### Start the server + +Start the server with default logging configuration: + +```bash +RUST_LOG=atm0s_sdn_network=error,info \ +RUST_BACKTRACE=1 \ +./atm0s-media-server standalone +``` + +## Advanced configuration + +```bash +❯ ./atm0s-media-server standalone --help +Usage: atm0s-media-server standalone [OPTIONS] + +Options: + --console-port + The port for console server [env: CONSOLE_PORT=] [default: 8080] + --gateway-port + The port for gateway server [env: GATEWAY_PORT=] [default: 3000] + --geo-db + The path to the GeoIP database [env: GEO_DB=] [default: ./maxminddb-data/GeoLite2-City.mmdb] + --max-cpu + Maximum CPU usage (in percent) allowed for routing to a media node or gateway node [env: MAX_CPU=] [default: 60] + --max-memory + Maximum memory usage (in percent) allowed for routing to a media node or gateway node [env: MAX_MEMORY=] [default: 80] + --max-disk + Maximum disk usage (in percent) allowed for routing to a media node or gateway node [env: MAX_DISK=] [default: 90] + --multi-tenancy-sync + Multi-tenancy sync endpoint [env: MULTI_TENANCY_SYNC=] + --multi-tenancy-sync-interval-ms + Multi-tenancy sync interval in milliseconds [env: MULTI_TENANCY_SYNC_INTERVAL_MS=] [default: 30000] + --record-cache + Record cache directory [env: RECORD_CACHE=] [default: ./record_cache/] + --record-mem-max-size + Maximum size of the recording cache in bytes [env: RECORD_MEM_MAX_SIZE=] [default: 100000000] + --record-upload-worker + Number of workers for uploading recordings [env: RECORD_UPLOAD_WORKER=] [default: 5] + --db-uri + DB Uri [env: DB_URI=] [default: sqlite://connector.db?mode=rwc] + --s3-uri + S3 Uri [env: S3_URI=] [default: http://minioadmin:minioadmin@localhost:9000/record/?path_style=true] + --hook-uri + Hook URI [env: HOOK_URI=] + --hook-workers + Number of workers for hook [env: HOOK_WORKERS=] [default: 8] + --hook-body-type + Hook body type [env: HOOK_BODY_TYPE=] [default: protobuf-json] [possible values: protobuf-json, protobuf-binary] + --destroy-room-after-ms + Destroy room after no-one online, default is 2 minutes [env: DESTROY_ROOM_AFTER_MS=] [default: 120000] + --storage-tick-interval-ms + Storage tick interval, default is 1 minute [env: STORAGE_TICK_INTERVAL_MS=] [default: 60000] + --rtpengine-rtp-ip + The IP address for RTPengine RTP listening. Default: 127.0.0.1 [env: RTPENGINE_RTP_IP=] [default: 127.0.0.1] + --media-instance-count + Media instance count [env: MEDIA_INSTANCE_COUNT=] [default: 2] + -h, --help + Print help +``` \ No newline at end of file diff --git a/packages/media_connector/src/sql_storage/migration/m20240824_0001_add_room_destroy_and_record.rs b/packages/media_connector/src/sql_storage/migration/m20240824_0001_add_room_destroy_and_record.rs index 7d8ec46c..e3dfbd99 100644 --- a/packages/media_connector/src/sql_storage/migration/m20240824_0001_add_room_destroy_and_record.rs +++ b/packages/media_connector/src/sql_storage/migration/m20240824_0001_add_room_destroy_and_record.rs @@ -22,14 +22,36 @@ impl MigrationTrait for Migration { .alter_table(Table::alter().table(PeerSession::Table).add_column(ColumnDef::new(PeerSession::Record).string()).to_owned()) .await?; let db = manager.get_connection(); - db.execute_unprepared( - "UPDATE peer_session - SET room = t1.room - FROM peer_session t2 - INNER JOIN peer t1 on t1.id = t2.peer - ", - ) - .await?; + match db.get_database_backend() { + sea_orm::DatabaseBackend::MySql => { + db.execute_unprepared( + "UPDATE peer_session t2 + INNER JOIN peer t1 ON t1.id = t2.peer + SET t2.room = t1.room", + ) + .await?; + } + sea_orm::DatabaseBackend::Postgres => { + db.execute_unprepared( + "UPDATE peer_session + SET room = t1.room + FROM peer_session t2 + INNER JOIN peer t1 on t1.id = t2.peer", + ) + .await?; + } + sea_orm::DatabaseBackend::Sqlite => { + db.execute_unprepared( + "UPDATE peer_session + SET room = ( + SELECT t1.room + FROM peer t1 + WHERE t1.id = peer_session.peer + )", + ) + .await?; + } + } manager .create_index(Index::create().name("room_last_peer_leaved_at").table(Room::Table).col(Room::LastPeerLeavedAt).to_owned()) diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index 9ad5259c..66ce12a1 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -6,6 +6,8 @@ use std::{ }; use atm0s_sdn::{ + base::ServiceBuilder, + features::{FeaturesControl, FeaturesEvent}, generate_node_addr, secure::{HandshakeBuilderXDA, StaticKeyAuthorization}, services::{manual_discovery, visualization}, @@ -49,6 +51,8 @@ pub struct MediaConfig { pub rtpengine_rtp_ip: IpAddr, pub secure: Arc, pub max_live: HashMap, + pub enable_gateway_agent: bool, + pub enable_connector_agent: bool, } pub type SdnConfig = SdnWorkerCfg; @@ -83,6 +87,8 @@ pub enum SE { pub type TC = (); pub type TW = (); +pub type WServiceBuilder = dyn ServiceBuilder; + pub enum Input { NodeStats(NodeMetrics), ExtRpc(u64, RpcReq), @@ -170,10 +176,16 @@ impl MediaServerWorker { vec![], vec![generate_gateway_zone_tag(sdn_zone)], )); - let gateway = Arc::new(GatewayAgentServiceBuilder::new(media.max_live)); - let connector = Arc::new(ConnectorAgentServiceBuilder::new()); let history = Arc::new(DataWorkerHistory::default()); + let mut services: Vec> = vec![visualization, discovery]; + if media.enable_gateway_agent { + services.push(Arc::new(GatewayAgentServiceBuilder::new(media.max_live))); + } + if media.enable_connector_agent { + services.push(Arc::new(ConnectorAgentServiceBuilder::new())); + } + let sdn_config = SdnConfig { node_id, controller: if controller { @@ -183,18 +195,14 @@ impl MediaServerWorker { authorization: Arc::new(StaticKeyAuthorization::new(secret)), handshake_builder: Arc::new(HandshakeBuilderXDA), random: Box::new(OsRng), - services: vec![visualization.clone(), discovery.clone(), gateway.clone(), connector.clone()], + services: services.clone(), history: history.clone(), }) } else { None }, tick_ms: 1000, - data: DataPlaneCfg { - worker_id: 0, - services: vec![visualization, discovery, gateway, connector], - history, - }, + data: DataPlaneCfg { worker_id: 0, services, history }, }; let mut queue = DynamicDeque::default();