From 2f3381d48ab429ff1ecc48e76e3e2e76eeef64de Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Tue, 26 Nov 2024 22:08:40 +0700 Subject: [PATCH] feat: automatic SDN config with node-api and local_ip (#455) * feat: SDN discovery over node http-api * added docs * make it simpler * feat: generate node-id from local_io * feat: nat-traversal by detect public_ip from cloud metadata * fix aws public-ip * fix ip parse * setting rtpengine public-ip * accept both single addr or array addrs with seeds-from-url * fix: connector only need to connect to same-zone gateway * fix record with aws-s3 * fix record compose * fix warns * added node router_dump --- Cargo.lock | 28 ++++--- Cargo.toml | 4 +- bin/Cargo.toml | 2 + bin/src/http.rs | 78 +++++++++++++++++-- bin/src/http/api_console/cluster.rs | 35 ++++++++- bin/src/http/api_node.rs | 51 ++++++++++++ bin/src/lib.rs | 53 ++++++++++++- bin/src/main.rs | 58 ++++++++++++-- bin/src/server/connector.rs | 38 ++++++++- bin/src/server/console.rs | 28 ++++++- bin/src/server/gateway.rs | 53 +++++++++---- bin/src/server/media.rs | 46 +++++++++-- bin/src/server/standalone.rs | 7 +- bin/z0_connector_n4.sh | 2 +- bin/z0_gate_n1.sh | 2 +- bin/z0_media_n2.sh | 5 +- bin/z0_media_n3.sh | 5 +- bin/z1_gate_n1.sh | 2 +- bin/z1_media_n2.sh | 2 +- bin/z1_media_n3.sh | 2 +- docs/getting-started/installation/README.md | 3 + .../installation/auto-generate-node-id.md | 22 ++++++ .../installation/nat-traversal.md | 20 +++++ .../installation/network-discovery.md | 48 ++++++++++++ packages/media_connector/Cargo.toml | 2 +- packages/media_connector/src/sql_storage.rs | 16 ++-- .../media_record/bin/convert_record_worker.rs | 71 +++++++++++++---- packages/media_record/bin/run_worker.sh | 4 - packages/media_record/src/convert/composer.rs | 2 + packages/media_runner/src/worker.rs | 5 +- packages/media_utils/Cargo.toml | 3 +- packages/media_utils/src/uri.rs | 10 ++- packages/transport_rtpengine/src/transport.rs | 16 ++-- packages/transport_rtpengine/src/worker.rs | 12 +-- 34 files changed, 625 insertions(+), 110 deletions(-) create mode 100644 bin/src/http/api_node.rs create mode 100644 docs/getting-started/installation/auto-generate-node-id.md create mode 100644 docs/getting-started/installation/nat-traversal.md create mode 100644 docs/getting-started/installation/network-discovery.md delete mode 100644 packages/media_record/bin/run_worker.sh diff --git a/Cargo.lock b/Cargo.lock index 8dedfc3f..1dd3565e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -495,12 +495,14 @@ dependencies = [ "quinn", "rand 0.8.5", "rcgen", + "reqwest", "rtpengine-ngcontrol", "rust-embed", "rustls", "sans-io-runtime", "sentry", "serde", + "serde_json", "sysinfo", "tokio", "tracing-subscriber", @@ -509,8 +511,7 @@ dependencies = [ [[package]] name = "atm0s-sdn" version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ae4bafdf42069f4419ca9b361a3a31d59d8bbdbf87b14a0eee7fdcd744d30db" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "atm0s-sdn-identity", "atm0s-sdn-network", @@ -529,8 +530,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-identity" version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ec96b78570f6f704f7b7074f2c496604c2729044d5374441fee7cf202ee7633" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "multiaddr", "rand 0.8.5", @@ -540,8 +540,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-network" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68bb1728d2a0f5f517c6be60a6e32bd73767b8bf70bb9df5f006d188def58210" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "aes-gcm 0.10.3", "atm0s-sdn-identity", @@ -568,8 +567,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-router" version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6679fb3c1797ae958c247a0549c3e33224c7c0352e5b28bb984f8a7e90bd7bf7" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "atm0s-sdn-identity", "atm0s-sdn-utils", @@ -581,8 +579,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-utils" version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "437f8450fd7095986c5bce4abd7ad03e95f1651a214b8727976363c86d6b9cc9" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "log", "serde", @@ -3478,6 +3475,7 @@ dependencies = [ "sorted-vec", "spin", "uriparse", + "urlencoding", ] [[package]] @@ -5309,9 +5307,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "s3-presign" -version = "0.0.2" +version = "0.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "748e207ba5bb8317987e9a6db2e33c8e43de4eec7ad6c2e5faec49f6fdd46e92" +checksum = "87dd93104cb3e5cff33676215f1ea9296bd524d2b783cbaf24422955d6b50dd1" dependencies = [ "chrono", "hmac 0.12.1", @@ -7128,6 +7126,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index 4a3804e8..1a71fcf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ members = [ [workspace.dependencies] sans-io-runtime = { version = "0.3", default-features = false } -atm0s-sdn = { version = "0.2", default-features = false } -atm0s-sdn-network = { version = "0.6", default-features = false } +atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false } +atm0s-sdn-network = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false } tokio = "1.37" tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] } convert-enum = "0.1" diff --git a/bin/Cargo.toml b/bin/Cargo.toml index c87a53e7..f25a5872 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -28,6 +28,7 @@ media-server-multi-tenancy = { path = "../packages/multi_tenancy", optional = tr rtpengine-ngcontrol = { path = "../packages/rtpengine_ngcontrol", optional = true } local-ip-address = "0.6" serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0" } quinn = { version = "0.11", optional = true } rustls = { version = "0.23", optional = true } convert-enum = { workspace = true } @@ -38,6 +39,7 @@ maxminddb = { version = "0.24", optional = true } sysinfo = { version = "0.31", optional = true } hex = { version = "0.4", optional = true } mime_guess = { version = "2.0", optional = true } +reqwest = { version = "0.12", features = ["json"]} sentry = "0.34" [features] diff --git a/bin/src/http.rs b/bin/src/http.rs index db43ad8c..692267a0 100644 --- a/bin/src/http.rs +++ b/bin/src/http.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; +pub use api_node::NodeApiCtx; use media_server_protocol::endpoint::ClusterConnId; #[cfg(feature = "console")] use media_server_protocol::protobuf::cluster_connector::MediaConnectorServiceClient; @@ -14,6 +15,7 @@ use poem::{listener::TcpListener, middleware::Cors, EndpointExt, Route, Server}; use poem_openapi::types::{ToJSON, Type}; use poem_openapi::OpenApiService; use poem_openapi::{types::ParseFromJSON, Object}; +use serde::Deserialize; use tokio::sync::mpsc::Sender; #[cfg(feature = "embed_static")] use utils::EmbeddedFilesEndpoint; @@ -21,6 +23,7 @@ use utils::EmbeddedFilesEndpoint; mod api_console; mod api_media; mod api_metrics; +mod api_node; mod api_token; mod utils; @@ -34,13 +37,13 @@ pub struct PublicMediaFiles; #[folder = "public/console"] pub struct PublicConsoleFiles; -#[derive(Debug, Default, Object)] +#[derive(Debug, Default, Object, Deserialize)] pub struct Pagination { pub total: usize, pub current: usize, } -#[derive(Debug, Object)] +#[derive(Debug, Object, Deserialize)] pub struct Response { pub status: bool, #[oai(skip_serializing_if = "Option::is_none")] @@ -65,25 +68,31 @@ impl Default for Response { #[cfg(feature = "console")] pub async fn run_console_http_server( port: u16, + node: NodeApiCtx, secure: media_server_secure::jwt::MediaConsoleSecureJwt, storage: crate::server::console_storage::StorageShared, connector: MediaConnectorServiceClient, ) -> Result<(), Box> { use poem::middleware::Tracing; - let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); - let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/"); + let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/"); let user_ui = user_service.swagger_ui(); let user_spec = user_service.spec(); - let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Console Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/"); + let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/"); let cluster_ui = cluster_service.swagger_ui(); let cluster_spec = cluster_service.spec(); - let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Console Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/"); + let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/"); let connector_ui = connector_service.swagger_ui(); let connector_spec = connector_service.spec(); @@ -96,6 +105,10 @@ pub async fn run_console_http_server( let route = Route::new() .nest("/", console_panel) + //node + .nest("/api/node/", node_service) + .nest("/api/node/ui", node_ui) + .at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone())) //metrics .nest("/api/metrics/", metrics_service) .nest("/api/metrics/ui", metrics_ui) @@ -122,6 +135,7 @@ pub async fn run_console_http_server( #[cfg(feature = "gateway")] pub async fn run_gateway_http_server( port: u16, + node: NodeApiCtx, sender: Sender, RpcRes>>, edge_secure: Arc, gateway_secure: Arc, @@ -130,7 +144,12 @@ pub async fn run_gateway_http_server = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); @@ -177,6 +196,10 @@ pub async fn run_gateway_http_server Result<(), Box> { + use poem::middleware::Tracing; + + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let metrics_ui = metrics_service.swagger_ui(); + let metrics_spec = metrics_service.spec(); + + let route = Route::new() + //node + .nest("/api/node/", node_service) + .nest("/api/node/ui", node_ui) + .at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone())) + //metrics + .nest("/api/metrics/", metrics_service) + .nest("/api/metrics/ui", metrics_ui) + .at("/api/metrics/spec", poem::endpoint::make_sync(move |_| metrics_spec.clone())) + .with(Cors::new()) + .with(Tracing); + + Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?; + Ok(()) +} + #[cfg(feature = "media")] pub async fn run_media_http_server( port: u16, + node: NodeApiCtx, sender: Sender, RpcRes>>, edge_secure: Arc, gateway_secure: Option>, ) -> Result<(), Box> { let mut route = Route::new(); - let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); @@ -273,6 +331,10 @@ pub async fn run_media_http_server, node_type: Query, Data(ctx): Data<&ConsoleApisCtx>) -> Json> { + log::info!("seeds_for zone_id: {}, node_type: {:?}", zone_id.0, node_type.0); + match node_type.0 { + NodeType::Console => Json(ctx.storage.consoles().iter().map(|node| node.addr.clone()).collect()), + NodeType::Gateway => { + let consoles = ctx.storage.consoles().into_iter().map(|node| node.addr.clone()); + let zone = ctx.storage.zone(ZoneId(zone_id.0)); + let same_zone_gateways = zone.iter().flat_map(|n| n.gateways.iter()).map(|n| n.addr.clone()); + Json(consoles.chain(same_zone_gateways).collect()) + } + NodeType::Connector => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()), + NodeType::Media => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()), + } + } + /// get consoles from all zones #[oai(path = "/consoles", method = "get")] async fn consoles(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>) -> Json>> { diff --git a/bin/src/http/api_node.rs b/bin/src/http/api_node.rs new file mode 100644 index 00000000..370de099 --- /dev/null +++ b/bin/src/http/api_node.rs @@ -0,0 +1,51 @@ +use std::time::Duration; + +use atm0s_sdn::NodeAddr; +use poem_openapi::{ + payload::{Json, PlainText}, + OpenApi, +}; +use tokio::sync::{mpsc::Sender, oneshot}; + +pub struct NodeApiCtx { + pub address: NodeAddr, + pub dump_tx: Sender>, +} + +pub struct Apis { + ctx: NodeApiCtx, +} + +impl Apis { + pub fn new(ctx: NodeApiCtx) -> Self { + Self { ctx } + } +} + +#[OpenApi] +impl Apis { + #[oai(path = "/address", method = "get")] + async fn get_address(&self) -> PlainText { + PlainText(self.ctx.address.to_string()) + } + + #[oai(path = "/router_dump", method = "get")] + async fn get_router_dump(&self) -> Json { + let (tx, rx) = oneshot::channel(); + self.ctx.dump_tx.send(tx).await.expect("should send"); + match tokio::time::timeout(Duration::from_millis(1000), rx).await { + Ok(Ok(v)) => Json(serde_json::json!({ + "status": true, + "data": v + })), + Ok(Err(e)) => Json(serde_json::json!({ + "status": false, + "error": e.to_string() + })), + Err(_e) => Json(serde_json::json!({ + "status": false, + "error": "timeout" + })), + } + } +} diff --git a/bin/src/lib.rs b/bin/src/lib.rs index 6eb8492e..bb18ee35 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -1,6 +1,10 @@ -use std::net::SocketAddr; +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, +}; use atm0s_sdn::{NodeAddr, NodeId}; +use clap::ValueEnum; use media_server_protocol::cluster::ZoneId; mod errors; @@ -22,3 +26,50 @@ pub struct NodeConfig { pub zone: ZoneId, pub bind_addrs_alt: Vec, } + +/// Fetch node addrs from the given url. +/// The url should return a list of node addrs in JSON format or a single node addr. +pub async fn fetch_node_addrs_from_api(url: &str) -> Result, String> { + let resp = reqwest::get(url).await.map_err(|e| e.to_string())?; + let content = resp.text().await.map_err(|e| e.to_string())?; + if content.starts_with('[') { + let node_addrs: Vec = serde_json::from_str(&content).map_err(|e| e.to_string())?; + Ok(node_addrs.into_iter().flat_map(|addr| NodeAddr::from_str(&addr)).collect()) + } else { + Ok(vec![NodeAddr::from_str(&content).map_err(|e| e.to_string())?]) + } +} + +#[derive(Debug, Clone, ValueEnum)] +pub enum CloudProvider { + Aws, + Gcp, + Azure, + Other, +} + +pub async fn fetch_node_ip_alt_from_cloud(cloud: CloudProvider) -> Result { + match cloud { + CloudProvider::Aws => { + let resp = reqwest::get("http://169.254.169.254/latest/meta-data/public-ipv4").await.map_err(|e| e.to_string())?; + let ip = resp.text().await.map_err(|e| e.to_string())?; + IpAddr::from_str(ip.trim()).map_err(|e| e.to_string()) + } + CloudProvider::Gcp => { + let client = reqwest::Client::new(); + let resp = client + .get("http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip") + .header("Metadata-Flavor", "Google") + .send() + .await + .map_err(|e| e.to_string())?; + let ip = resp.text().await.map_err(|e| e.to_string())?; + IpAddr::from_str(ip.trim()).map_err(|e| e.to_string()) + } + CloudProvider::Azure | CloudProvider::Other => { + let resp = reqwest::get("http://ipv4.icanhazip.com").await.map_err(|e| e.to_string())?; + let ip = resp.text().await.map_err(|e| e.to_string())?; + IpAddr::from_str(ip.trim()).map_err(|e| e.to_string()) + } + } +} diff --git a/bin/src/main.rs b/bin/src/main.rs index 9249741c..eed0ab6f 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -1,6 +1,7 @@ use std::net::{IpAddr, SocketAddr}; -use atm0s_media_server::{server, NodeConfig}; +use atm0s_media_server::{fetch_node_addrs_from_api, server, NodeConfig}; +use atm0s_media_server::{fetch_node_ip_alt_from_cloud, CloudProvider}; use atm0s_sdn::NodeAddr; use clap::Parser; use media_server_protocol::cluster::ZoneId; @@ -32,6 +33,11 @@ struct Args { #[arg(env, long, default_value_t = 0)] sdn_zone_node_id: u8, + /// Auto generate node_id from last 8 bits of local_ip which match prefix + /// Example: 192.168.1, or 10.10.10. + #[arg(env, long)] + sdn_zone_node_id_from_ip_prefix: Option, + /// Manually specify the IP address of the node. This disables IP autodetection. #[arg(env, long)] node_ip: Option, @@ -40,6 +46,10 @@ struct Args { #[arg(env, long)] node_ip_alt: Vec, + /// Auto detect node_ip_alt with some common cloud provider metadata. + #[arg(env, long)] + node_ip_alt_cloud: Option, + /// Enable private IP addresses for the node. #[arg(env, long)] enable_private_ip: bool, @@ -56,6 +66,12 @@ struct Args { #[arg(env, long)] seeds: Vec, + /// Seeds from API, this is used for auto-discovery of seeds. + /// Currently all of nodes expose /api/node/address endpoint, so we can get seeds from there. + /// Or we can get from console api /api/cluster/seeds?zone_id=xxx&node_type=xxx + #[arg(env, long)] + seeds_from_url: Option, + /// Number of worker threads to spawn. #[arg(env, long, default_value_t = 1)] workers: usize, @@ -103,6 +119,27 @@ async fn main() { let workers = args.workers; + let mut auto_generated_node_id = None; + if let Some(ip_prefix) = args.sdn_zone_node_id_from_ip_prefix { + for (name, ip) in local_ip_address::list_afinet_netifas().expect("Should have list interfaces") { + if let IpAddr::V4(ipv4) = ip { + if ipv4.to_string().starts_with(&ip_prefix) { + auto_generated_node_id = Some(ipv4.octets()[3]); + log::info!("Found ip prefix {ip_prefix} on {name} with ip {ip} => auto generate sdn_zone_node_id with {}", ipv4.octets()[3]); + break; + } + } + } + } + + let mut node_ip_alt_cloud = vec![]; + if let Some(cloud) = args.node_ip_alt_cloud { + log::info!("Fetch public ip from cloud provider {:?}", cloud); + let public_ip = fetch_node_ip_alt_from_cloud(cloud).await.expect("should get node ip alt"); + log::info!("Fetched public ip {:?}", public_ip); + node_ip_alt_cloud.push(public_ip); + } + let bind_addrs = if let Some(ip) = args.node_ip { vec![SocketAddr::new(ip, sdn_port)] } else { @@ -119,17 +156,28 @@ async fn main() { .map(|(_name, ip)| SocketAddr::new(ip, sdn_port)) .collect::>() }; - let node = NodeConfig { - node_id: ZoneId(args.sdn_zone_id).to_node_id(args.sdn_zone_node_id), + let mut node = NodeConfig { + node_id: ZoneId(args.sdn_zone_id).to_node_id(auto_generated_node_id.unwrap_or(args.sdn_zone_node_id)), secret: args.secret, seeds: args.seeds, bind_addrs, zone: ZoneId(args.sdn_zone_id), - bind_addrs_alt: args.node_ip_alt.into_iter().map(|ip| SocketAddr::new(ip, sdn_port)).collect::>(), + bind_addrs_alt: node_ip_alt_cloud + .into_iter() + .chain(args.node_ip_alt.into_iter()) + .map(|ip| SocketAddr::new(ip, sdn_port)) + .collect::>(), }; log::info!("Bind addrs {:?}, bind addrs alt {:?}", node.bind_addrs, node.bind_addrs_alt); + if let Some(url) = args.seeds_from_url { + log::info!("Generate seeds from node_api {}", url); + let addrs = fetch_node_addrs_from_api(&url).await.expect("should get seeds"); + log::info!("Generated seeds {:?}", addrs); + node.seeds = addrs; + } + let local = tokio::task::LocalSet::new(); local .run_until(async move { @@ -139,7 +187,7 @@ async fn main() { #[cfg(feature = "gateway")] server::ServerType::Gateway(args) => server::run_media_gateway(workers, http_port, node, args).await, #[cfg(feature = "connector")] - server::ServerType::Connector(args) => server::run_media_connector(workers, node, args).await, + server::ServerType::Connector(args) => server::run_media_connector(workers, http_port, node, args).await, #[cfg(feature = "media")] server::ServerType::Media(args) => server::run_media_server(workers, http_port, node, args).await, #[cfg(feature = "cert_utils")] diff --git a/bin/src/server/connector.rs b/bin/src/server/connector.rs index d2f0f1ec..aea72857 100644 --- a/bin/src/server/connector.rs +++ b/bin/src/server/connector.rs @@ -1,6 +1,11 @@ use std::{sync::Arc, time::Duration}; -use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner}; +use atm0s_sdn::{ + features::{router_sync, FeaturesEvent}, + secure::StaticKeyAuthorization, + services::visualization, + SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner, +}; use clap::Parser; use media_server_connector::{ handler_service::{self, ConnectorHandlerServiceBuilder}, @@ -10,6 +15,7 @@ use media_server_multi_tenancy::{MultiTenancyStorage, MultiTenancySync}; use media_server_protocol::{ cluster::{ClusterNodeGenericInfo, ClusterNodeInfo}, connector::CONNECTOR_RPC_PORT, + gateway::generate_gateway_zone_tag, protobuf::cluster_connector::{connector_response, MediaConnectorServiceServer}, rpc::quinn::QuinnServer, }; @@ -18,6 +24,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use tokio::sync::mpsc::channel; use crate::{ + http::{run_connector_http_server, NodeApiCtx}, node_metrics::NodeMetricsCollector, quinn::{make_quinn_server, VirtualNetwork}, NodeConfig, @@ -81,7 +88,7 @@ pub struct Args { pub multi_tenancy_sync_interval_ms: u64, } -pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { +pub async fn run_media_connector(workers: usize, http_port: Option, node: NodeConfig, args: Args) { let app_storage = if let Some(url) = args.multi_tenancy_sync { let app_storage = Arc::new(MultiTenancyStorage::new()); let mut app_sync = MultiTenancySync::new(app_storage.clone(), url, Duration::from_millis(args.multi_tenancy_sync_interval_ms)); @@ -124,7 +131,7 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { }); builder.set_authorization(StaticKeyAuthorization::new(&node.secret)); - builder.set_manual_discovery(vec!["connector".to_string()], vec!["gateway".to_string()]); + builder.set_manual_discovery(vec!["connector".to_string()], vec![generate_gateway_zone_tag(node.zone)]); builder.add_service(Arc::new(ConnectorHandlerServiceBuilder::new())); for seed in node.seeds { @@ -200,6 +207,18 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { } }); + // Start http server + let (dump_tx, mut dump_rx) = channel(10); + if let Some(http_port) = http_port { + let node_ctx = NodeApiCtx { address: node_addr.clone(), dump_tx }; + tokio::spawn(async move { + if let Err(e) = run_connector_http_server(http_port, node_ctx).await { + log::error!("HTTP Error: {}", e); + } + }); + } + let mut wait_dump_router = vec![]; + loop { if controller.process().is_none() { break; @@ -223,6 +242,11 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { controller.service_control(HANDLER_SERVICE_ID.into(), (), control.into()); } + while let Ok(v) = dump_rx.try_recv() { + controller.feature_control((), router_sync::Control::DumpRouter.into()); + wait_dump_router.push(v); + } + while let Some(out) = controller.pop_event() { match out { SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event { @@ -237,6 +261,14 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { log::error!("[MediaConnector] forward Sdn SocketEvent error {:?}", e); } } + SdnExtOut::FeaturesEvent(_, FeaturesEvent::RouterSync(event)) => match event { + router_sync::Event::DumpRouter(dump) => { + let json = serde_json::to_value(dump).expect("should convert json"); + while let Some(v) = wait_dump_router.pop() { + let _ = v.send(json.clone()); + } + } + }, _ => {} } } diff --git a/bin/src/server/console.rs b/bin/src/server/console.rs index 8174dbce..9eae74bc 100644 --- a/bin/src/server/console.rs +++ b/bin/src/server/console.rs @@ -1,6 +1,11 @@ use std::time::{Duration, Instant}; -use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner}; +use atm0s_sdn::{ + features::{router_sync, FeaturesEvent}, + secure::StaticKeyAuthorization, + services::visualization, + SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner, +}; use clap::Parser; use media_server_protocol::{ cluster::{ClusterNodeGenericInfo, ClusterNodeInfo}, @@ -9,9 +14,10 @@ use media_server_protocol::{ }; use media_server_secure::jwt::MediaConsoleSecureJwt; use storage::StorageShared; +use tokio::sync::mpsc::channel; use crate::{ - http::run_console_http_server, + http::{run_console_http_server, NodeApiCtx}, node_metrics::NodeMetricsCollector, quinn::{make_quinn_client, VirtualNetwork}, NodeConfig, @@ -67,17 +73,20 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No tokio::task::spawn_local(async move { while vnet.recv().await.is_some() {} }); + let (dump_tx, mut dump_rx) = channel(10); if let Some(http_port) = http_port { let secure = MediaConsoleSecureJwt::from(node.secret.as_bytes()); let storage = storage.clone(); + let node_ctx = NodeApiCtx { address: node_addr.clone(), dump_tx }; tokio::spawn(async move { - if let Err(e) = run_console_http_server(http_port, secure, storage, connector_rpc_client).await { + if let Err(e) = run_console_http_server(http_port, node_ctx, secure, storage, connector_rpc_client).await { log::error!("HTTP Error: {}", e); } }); } let mut node_metrics_collector = NodeMetricsCollector::default(); + let mut wait_dump_router = vec![]; loop { if controller.process().is_none() { @@ -99,6 +108,11 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No storage.on_tick(started_at.elapsed().as_millis() as u64); } + while let Ok(v) = dump_rx.try_recv() { + controller.feature_control((), router_sync::Control::DumpRouter.into()); + wait_dump_router.push(v); + } + while let Some(out) = controller.pop_event() { match out { SdnExtOut::ServicesEvent(_service, (), SE::Visual(event)) => match event { @@ -118,6 +132,14 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No log::error!("forward sdn SocketEvent error {:?}", e); } } + SdnExtOut::FeaturesEvent(_, FeaturesEvent::RouterSync(event)) => match event { + router_sync::Event::DumpRouter(dump) => { + let json = serde_json::to_value(dump).expect("should convert json"); + while let Some(v) = wait_dump_router.pop() { + let _ = v.send(json.clone()); + } + } + }, _ => {} } } diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index 1423de98..55738ae5 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -1,6 +1,11 @@ use std::{sync::Arc, time::Duration}; -use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner}; +use atm0s_sdn::{ + features::{router_sync, FeaturesEvent}, + secure::StaticKeyAuthorization, + services::visualization, + SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner, +}; use clap::Parser; use media_server_connector::agent_service::ConnectorAgentServiceBuilder; use media_server_gateway::{store_service::GatewayStoreServiceBuilder, STORE_SERVICE_ID}; @@ -15,9 +20,10 @@ use media_server_secure::jwt::{MediaEdgeSecureJwt, MediaGatewaySecureJwt}; use rtpengine_ngcontrol::NgUdpTransport; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use std::net::SocketAddr; +use tokio::sync::mpsc::channel; use crate::{ - http::run_gateway_http_server, + http::{run_gateway_http_server, NodeApiCtx}, ng_controller::NgControllerServer, node_metrics::NodeMetricsCollector, quinn::{make_quinn_client, make_quinn_server, VirtualNetwork}, @@ -114,17 +120,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let gateway_secure = Arc::new(gateway_secure); let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); - if let Some(http_port) = http_port { - let req_tx = req_tx.clone(); - let secure2 = edge_secure.clone(); - tokio::spawn(async move { - if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure).await { - log::error!("HTTP Error: {}", e); - } - }); - } - - //Running ng controller for Voip + // Running ng controller for Voip if let Some(ngproto_addr) = args.rtpengine_cmd_addr { let req_tx = req_tx.clone(); let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await; @@ -137,8 +133,8 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod }); } + // Setup Sdn let node_id = node.node_id; - let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.bind_addrs_alt); let node_addr = builder.node_addr(); let node_info = ClusterNodeInfo::Gateway( @@ -168,6 +164,19 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let mut controller = builder.build::>(workers, node_info); let (selector, mut requester) = build_dest_selector(); + // Setup HTTP server + let (dump_tx, mut dump_rx) = channel(10); + if let Some(http_port) = http_port { + let req_tx = req_tx.clone(); + let secure2 = edge_secure.clone(); + let node_ctx = NodeApiCtx { address: node_addr.clone(), dump_tx }; + tokio::spawn(async move { + if let Err(e) = run_gateway_http_server(http_port, node_ctx, req_tx, secure2, gateway_secure).await { + log::error!("HTTP Error: {}", e); + } + }); + } + // Ip location for routing client to closest gateway let ip2location = Arc::new(Ip2Location::new(&args.geo_db)); @@ -210,6 +219,9 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod // Subscribe ConnectorHandler service controller.service_control(media_server_connector::AGENT_SERVICE_ID.into(), (), media_server_connector::agent_service::Control::Sub.into()); + // List all waiting router dump requests + let mut wait_dump_router = vec![]; + loop { if controller.process().is_none() { break; @@ -255,6 +267,11 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod controller.service_control(media_server_connector::AGENT_SERVICE_ID.into(), (), control.into()); } + while let Ok(v) = dump_rx.try_recv() { + controller.feature_control((), router_sync::Control::DumpRouter.into()); + wait_dump_router.push(v); + } + while let Some(out) = controller.pop_event() { match out { SdnExtOut::ServicesEvent(_, _, SE::Gateway(event)) => match event { @@ -274,6 +291,14 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod log::error!("[MediaGateway] forward Sdn SocketEvent error {:?}", e); } } + SdnExtOut::FeaturesEvent(_, FeaturesEvent::RouterSync(event)) => match event { + router_sync::Event::DumpRouter(dump) => { + let json = serde_json::to_value(dump).expect("should convert json"); + while let Some(v) = wait_dump_router.pop() { + let _ = v.send(json.clone()); + } + } + }, _ => {} } } diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 09d8c98e..5b5a15be 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -5,7 +5,10 @@ use std::{ time::{Duration, Instant}, }; -use atm0s_sdn::{features::FeaturesEvent, SdnExtIn, SdnExtOut, TimePivot, TimeTicker}; +use atm0s_sdn::{ + features::{router_sync, FeaturesEvent}, + generate_node_addr, SdnExtIn, SdnExtOut, TimePivot, TimeTicker, +}; use clap::Parser; use media_server_gateway::ServiceKind; use media_server_multi_tenancy::MultiTenancyStorage; @@ -25,9 +28,10 @@ use rand::random; use rtpengine_ngcontrol::NgUdpTransport; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use sans_io_runtime::{backend::PollingBackend, Controller}; +use tokio::sync::mpsc::channel; use crate::{ - http::run_media_http_server, + http::{run_media_http_server, NodeApiCtx}, ng_controller::NgControllerServer, node_metrics::NodeMetricsCollector, quinn::{make_quinn_server, VirtualNetwork}, @@ -63,7 +67,7 @@ pub struct Args { /// The IP address for RTPengine RTP listening. /// Default: 127.0.0.1 #[arg(env, long, default_value = "127.0.0.1")] - pub rtpengine_rtp_ip: IpAddr, + pub rtpengine_listen_ip: IpAddr, /// Maximum concurrent connections per CPU core. #[arg(env, long, default_value_t = 200)] @@ -98,6 +102,8 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node let secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes())); let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); + let node_addr = generate_node_addr(node.node_id, &node.bind_addrs, node.bind_addrs_alt.clone()); + let (dump_tx, mut dump_rx) = channel(10); if let Some(http_port) = http_port { let secure_gateway = args.enable_token_api.then(|| { let app_storage = Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None)); @@ -105,8 +111,9 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node }); let req_tx = req_tx.clone(); let secure_edge = secure.clone(); + let node_ctx = NodeApiCtx { address: node_addr.clone(), dump_tx }; tokio::spawn(async move { - if let Err(e) = run_media_http_server(http_port, req_tx, secure_edge, secure_gateway).await { + if let Err(e) = run_media_http_server(http_port, node_ctx, req_tx, secure_edge, secure_gateway).await { log::error!("HTTP Error: {}", e); } }); @@ -139,6 +146,15 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node }; let webrtc_addrs = node.bind_addrs.iter().map(|addr| SocketAddr::new(addr.ip(), webrtc_port)).collect::>(); let webrtc_addrs_alt = node.bind_addrs_alt.iter().map(|addr| SocketAddr::new(addr.ip(), webrtc_port)).collect::>(); + let rtpengine_public_ip = webrtc_addrs + .iter() + .chain(webrtc_addrs_alt.iter()) + .find(|addr| match addr.ip() { + IpAddr::V4(ipv4) => !ipv4.is_unspecified() && !ipv4.is_multicast() && !ipv4.is_loopback() && !ipv4.is_broadcast() && !ipv4.is_private(), + IpAddr::V6(ipv6) => !ipv6.is_unspecified() && !ipv6.is_multicast() && !ipv6.is_loopback(), + }) + .map(|addr| addr.ip()) + .unwrap_or(args.rtpengine_listen_ip); println!("Running media server worker {i} with addrs: {:?}, ice-lite: {}", webrtc_addrs, args.ice_lite); @@ -149,7 +165,8 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node media: MediaConfig { webrtc_addrs, webrtc_addrs_alt, - rtpengine_rtp_ip: args.rtpengine_rtp_ip, + rtpengine_listen_ip: args.rtpengine_listen_ip, + rtpengine_public_ip, ice_lite: args.ice_lite, secure: secure.clone(), max_live: HashMap::from([(ServiceKind::Webrtc, workers as u32 * args.ccu_per_core), (ServiceKind::RtpEngine, workers as u32 * args.ccu_per_core)]), @@ -195,6 +212,9 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node let timer = TimePivot::build(); let mut ticker = TimeTicker::build(1000); + // List all waiting router dump requests + let mut wait_dump_router = vec![]; + loop { if controller.process().is_none() { break; @@ -255,6 +275,14 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node } } + while let Ok(v) = dump_rx.try_recv() { + controller.send_to( + 0, + ExtIn::Sdn(SdnExtIn::FeaturesControl(media_server_runner::UserData::Cluster, router_sync::Control::DumpRouter.into()), true), + ); + wait_dump_router.push(v); + } + while let Some(out) = controller.pop_event() { match out { ExtOut::Rpc(req_id, worker, res) => { @@ -271,6 +299,14 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node log::error!("[MediaEdge] forward Sdn SocketEvent error {:?}", e); } } + ExtOut::Sdn(SdnExtOut::FeaturesEvent(_, FeaturesEvent::RouterSync(event))) => match event { + router_sync::Event::DumpRouter(dump) => { + let json = serde_json::to_value(dump).expect("should convert json"); + while let Some(v) = wait_dump_router.pop() { + let _ = v.send(json.clone()); + } + } + }, ExtOut::Sdn(SdnExtOut::ServicesEvent(_service, userdata, SE::Connector(event))) => { match event { media_server_connector::agent_service::Event::Response(res) => { diff --git a/bin/src/server/standalone.rs b/bin/src/server/standalone.rs index f41bf784..0d04bf26 100644 --- a/bin/src/server/standalone.rs +++ b/bin/src/server/standalone.rs @@ -86,7 +86,7 @@ pub struct Args { /// The IP address for RTPengine RTP listening. /// Default: 127.0.0.1 #[arg(env, long, default_value = "127.0.0.1")] - pub rtpengine_rtp_ip: IpAddr, + pub rtpengine_listen_ip: IpAddr, /// Media instance count #[arg(env, long, default_value_t = 2)] @@ -176,6 +176,7 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) { tokio::task::spawn_local(async move { super::run_media_connector( workers, + None, NodeConfig { node_id: 30, secret, @@ -208,7 +209,7 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) { let record_cache = args.record_cache.clone(); let record_mem_max_size = args.record_mem_max_size; let record_upload_worker = args.record_upload_worker; - let rtpengine_rtp_ip = args.rtpengine_rtp_ip; + let rtpengine_listen_ip = args.rtpengine_listen_ip; tokio::task::spawn_local(async move { super::run_media_server( workers, @@ -226,7 +227,7 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) { ice_lite: false, webrtc_port_seed: 0, rtpengine_cmd_addr: None, - rtpengine_rtp_ip, + rtpengine_listen_ip, ccu_per_core: 200, record_cache, record_mem_max_size, diff --git a/bin/z0_connector_n4.sh b/bin/z0_connector_n4.sh index 262308fa..685c1fb5 100644 --- a/bin/z0_connector_n4.sh +++ b/bin/z0_connector_n4.sh @@ -3,6 +3,6 @@ RUST_BACKTRACE=1 \ cargo run -- \ --sdn-zone-id 0 \ --sdn-zone-node-id 4 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-url "http://localhost:3000/api/node/address" \ connector \ --s3-uri "http://minioadmin:minioadmin@127.0.0.1:9000/record" diff --git a/bin/z0_gate_n1.sh b/bin/z0_gate_n1.sh index 22ad93c2..565dc731 100644 --- a/bin/z0_gate_n1.sh +++ b/bin/z0_gate_n1.sh @@ -6,7 +6,7 @@ cargo run -- \ --sdn-port 10001 \ --sdn-zone-id 0 \ --sdn-zone-node-id 1 \ - --seeds 0@/ip4/127.0.0.1/udp/10000 \ + --seeds-from-url "http://localhost:8080/api/cluster/seeds?zone_id=0&node_type=Gateway" \ --workers 2 \ gateway \ --lat 10 \ diff --git a/bin/z0_media_n2.sh b/bin/z0_media_n2.sh index 566b4fdb..40866cc0 100644 --- a/bin/z0_media_n2.sh +++ b/bin/z0_media_n2.sh @@ -4,7 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 2 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-url "http://localhost:3000/api/node/address" \ --workers 2 \ - media \ - --enable-token-api + media diff --git a/bin/z0_media_n3.sh b/bin/z0_media_n3.sh index bdda16fa..9b4105ad 100644 --- a/bin/z0_media_n3.sh +++ b/bin/z0_media_n3.sh @@ -4,7 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 3 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-url "http://localhost:3000/api/node/address" \ --workers 2 \ - media \ - --enable-token-api + media diff --git a/bin/z1_gate_n1.sh b/bin/z1_gate_n1.sh index c4056239..fb9b8946 100644 --- a/bin/z1_gate_n1.sh +++ b/bin/z1_gate_n1.sh @@ -6,7 +6,7 @@ cargo run -- \ --sdn-zone-id 1 \ --sdn-zone-node-id 1 \ --sdn-port 11000 \ - --seeds 0@/ip4/127.0.0.1/udp/10000 \ + --seeds-from-url "http://localhost:8080/api/cluster/seeds?zone_id=1&node_type=Gateway" \ --workers 2 \ gateway \ --lat 20 \ diff --git a/bin/z1_media_n2.sh b/bin/z1_media_n2.sh index b2f736d4..28ba6016 100644 --- a/bin/z1_media_n2.sh +++ b/bin/z1_media_n2.sh @@ -4,7 +4,7 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 1 \ --sdn-zone-node-id 2 \ - --seeds 257@/ip4/127.0.0.1/udp/11000 \ + --seeds-from-url "http://localhost:4000/api/node/address" \ --workers 2 \ media \ --webrtc-port-seed 11200 \ diff --git a/bin/z1_media_n3.sh b/bin/z1_media_n3.sh index 75957c30..e34ea4e0 100644 --- a/bin/z1_media_n3.sh +++ b/bin/z1_media_n3.sh @@ -4,7 +4,7 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 1 \ --sdn-zone-node-id 3 \ - --seeds 257@/ip4/127.0.0.1/udp/11000 \ + --seeds-from-url "http://localhost:4000/api/node/address" \ --workers 2 \ media \ --webrtc-port-seed 11300 \ diff --git a/docs/getting-started/installation/README.md b/docs/getting-started/installation/README.md index 25a28e0e..918b92d8 100644 --- a/docs/getting-started/installation/README.md +++ b/docs/getting-started/installation/README.md @@ -36,3 +36,6 @@ Or you can use some tools to deploy atm0s-media-server: - [Kubernetes](./kubernetes.md) - [Docker Compose](./docker-compose.md) + +About network discovery, please refer to [Network Discovery](./network-discovery.md) for more details with your own use-case. +About some mechanism to generate node_id without manually specify, please refer to [Auto generate node_id](./auto-generate-node-id.md) for more details. diff --git a/docs/getting-started/installation/auto-generate-node-id.md b/docs/getting-started/installation/auto-generate-node-id.md new file mode 100644 index 00000000..ab0699d4 --- /dev/null +++ b/docs/getting-started/installation/auto-generate-node-id.md @@ -0,0 +1,22 @@ +# Auto generate node_id + +Most of the time, we rarely deploy new zones, so the zone-id can be manually specified. However, for nodes inside a zone, we usually use cloud or docker for easy management. This leads to the problem of having to manually specify node_ids inside a zone, which is inconvenient and error-prone. + +So we have implemented several mechanisms to auto-generate node_ids from machine information. + +## Auto generate node_id from local_ip + +The idea is simple: most cloud providers or bare-metal servers will assign a unique private IP to each machine, typically in the form of 10.10.10.x, 192.168.1.x, etc. + +We can use the last octet of the IP as the node_id. + +If the subnet is larger than /24, we still use the last 8 bits of the IP as the node_id, though this carries some risk of collision. In such cases, we recommend switching to a /24 subnet or using the NodeId pool. + +Example: +``` +cargo run -- --sdn-zone-id 0 --sdn-zone-node-id-from-ip-prefix "10.10.10" console +``` + +## NodeId pool + +Status: in progress \ No newline at end of file diff --git a/docs/getting-started/installation/nat-traversal.md b/docs/getting-started/installation/nat-traversal.md new file mode 100644 index 00000000..5e3c5a37 --- /dev/null +++ b/docs/getting-started/installation/nat-traversal.md @@ -0,0 +1,20 @@ +# NAT Traversal + +Some cloud providers (like AWS) route all traffic through a NAT gateway, which means that we don't have a public IP address on the VM. + +To work around this, we can use the `node_ip_alt` and `node_ip_alt_cloud` options to specify alternative IP addresses for the node. + +## Using node_ip_alt + +The `node_ip_alt` option takes a list of IP addresses as input, and the node will bind to each of them. + +## Using node_ip_alt_cloud + +The `node_ip_alt_cloud` option takes a cloud provider as input, and will automatically fetch the alternative IP address for the node. + +| Cloud Provider | Fetch URL | +| -------------- | ----------------------------------------------------------------------------------------------- | +| AWS | `http://169.254.169.254/latest/meta-data/local-ipv4` | +| GCP | `http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip` | +| Azure | `http://ipv4.icanhazip.com` | +| Other | `http://ipv4.icanhazip.com` | diff --git a/docs/getting-started/installation/network-discovery.md b/docs/getting-started/installation/network-discovery.md new file mode 100644 index 00000000..1cdced45 --- /dev/null +++ b/docs/getting-started/installation/network-discovery.md @@ -0,0 +1,48 @@ +# Network Discovery + +We have two ways to discover other nodes in the network: + +- Manually specify the seeds +- Query the node API + +## Manually specify the seeds + +Each time we start the node, we can manually specify the seeds. + +``` +cargo run -- --sdn-zone-id 0 --sdn-zone-node-id 1 --seeds 1@/ip4/127.0.0.1/udp/10001 media +``` + +This way is simple and easy to understand, but it's not flexible and is inconvenient to manage. + +## Query the node API + +We can use the node API to get the addresses of other nodes and then start the node with those addresses. + +``` +cargo run -- --sdn-zone-id 0 --sdn-zone-node-id 1 --seeds-from-node-api "http://localhost:3000" media +``` + +This way is flexible and convenient to manage, and we can also use it to dynamically get the addresses of other nodes. +A common use case is when deploying with docker-compose or kubernetes - we only need to set up the loadbalancer to point to the HTTP API of nodes, then use the API to provide addresses to other nodes. + +For example, we might have a loadbalancer config like this: + +| Zone | Node Type | Address | +| ---- | --------- | -------------------------------- | +| 0 | Console | http://console.atm0s.cloud | +| 0 | Gateway | http://gateway.zone0.atm0s.cloud | +| 1 | Gateway | http://gateway.zone1.atm0s.cloud | +| 2 | Gateway | http://gateway.zone2.atm0s.cloud | + +Then we can start nodes with config like this: + +| Zone | Node Type | Seeds From API | +| ---- | --------- | -------------------------------- | +| 0 | Gateway | http://console.atm0s.cloud | +| 0 | Media | http://gateway.zone0.atm0s.cloud | +| 1 | Gateway | http://console.atm0s.cloud | +| 1 | Media | http://gateway.zone1.atm0s.cloud | +| 2 | Gateway | http://console.atm0s.cloud | +| 2 | Media | http://gateway.zone2.atm0s.cloud | + diff --git a/packages/media_connector/Cargo.toml b/packages/media_connector/Cargo.toml index 0faced60..c743a992 100644 --- a/packages/media_connector/Cargo.toml +++ b/packages/media_connector/Cargo.toml @@ -24,7 +24,7 @@ sea-orm = { version = "1.1.0-rc.1", features = [ ] } sea-query = "0.32.0-rc.1" serde_json = "1.0" -s3-presign = "0.0.2" +s3-presign = "0.0.3" uuid = {version = "1.10", features = ["fast-rng", "v7"]} reqwest = { version = "0.12", features = ["json"]} diff --git a/packages/media_connector/src/sql_storage.rs b/packages/media_connector/src/sql_storage.rs index 4e0a082f..4c0088c8 100644 --- a/packages/media_connector/src/sql_storage.rs +++ b/packages/media_connector/src/sql_storage.rs @@ -52,23 +52,27 @@ impl ConnectorSqlStorage { .acquire_timeout(Duration::from_secs(8)) .idle_timeout(Duration::from_secs(8)) .max_lifetime(Duration::from_secs(8)) - .sqlx_logging(false) - .sqlx_logging_level(log::LevelFilter::Info); // Setting default PostgreSQL schema + .sqlx_logging(true) + .sqlx_logging_level(log::LevelFilter::Info); let db = Database::connect(opt).await.expect("Should connect to sql server"); migration::Migrator::up(&db, None).await.expect("Should run migration success"); let s3_endpoint = CustomUri::::try_from(cfg.s3_uri.as_str()).expect("should parse s3"); - let mut s3 = Presigner::new( - Credentials::new(s3_endpoint.username.expect("Should have s3 accesskey"), s3_endpoint.password.expect("Should have s3 secretkey"), None), + let mut s3 = Presigner::new_with_root( + Credentials::new( + s3_endpoint.username.as_deref().expect("Should have s3 accesskey"), + s3_endpoint.password.as_deref().expect("Should have s3 secretkey"), + None, + ), s3_endpoint.path.first().as_ref().expect("Should have bucket name"), s3_endpoint.query.region.as_ref().unwrap_or(&"".to_string()), + s3_endpoint.host.as_str(), ); - s3.endpoint(s3_endpoint.endpoint.as_str()); if s3_endpoint.query.path_style == Some(true) { + log::info!("[ConnectorSqlStorage] use path style"); s3.use_path_style(); } - let s3_sub_folder = s3_endpoint.path[1..].join("/"); Self { diff --git a/packages/media_record/bin/convert_record_worker.rs b/packages/media_record/bin/convert_record_worker.rs index 4a036db3..b3cde50c 100644 --- a/packages/media_record/bin/convert_record_worker.rs +++ b/packages/media_record/bin/convert_record_worker.rs @@ -27,7 +27,7 @@ use poem_openapi::{ use rusty_s3::S3Action; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; -macro_rules! try_opt { +macro_rules! try_validate_app { ($self:ident, $token:ident, $err:expr) => { match $self.apps.validate_app(&$token.token) { Some(app) => app, @@ -42,6 +42,21 @@ macro_rules! try_opt { }; } +macro_rules! try_opt { + ($opt:expr, $err:expr) => { + match $opt { + Some(o) => o, + None => { + return Json(Response { + status: false, + error: Some($err.to_owned()), + data: None, + }); + } + } + }; +} + #[derive(SecurityScheme)] #[oai(rename = "Token Authorization", ty = "bearer", key_in = "header", key_name = "Authorization")] pub struct TokenAuthorization(pub Bearer); @@ -187,14 +202,29 @@ struct HttpApis { impl HttpApis { #[oai(path = "/convert/job", method = "post")] async fn create_job(&self, TokenAuthorization(token): TokenAuthorization, Json(body): Json) -> Json> { - let app = try_opt!(self, token, "Invalid token"); + let app = try_validate_app!(self, token, "Invalid token"); let job_id = rand::random::().to_string(); - let input_s3 = format!("{}/{}/?path_style=true", self.input_s3_uri, body.record_path); - let transmux_s3_uri = self.transmux_s3_uri.clone(); + let input_s3 = try_opt!(concat_s3_uri_path(&self.input_s3_uri, &body.record_path), "Invalid input_s3_uri"); let compose_s3_uri = self.compose_s3_uri.clone(); let job_id_c = job_id.clone(); let hook = self.hook.clone(); + // get yyyy/mm/dd with chrono + let current_date_path = chrono::Utc::now().format("%Y/%m/%d").to_string(); + let transmux = if let Some(t) = body.transmux { + if let Some(custom_s3) = t.custom_s3 { + Some(RecordConvertOutputLocation::S3(custom_s3)) + } else { + let s3 = try_opt!( + concat_s3_uri_path(&self.transmux_s3_uri, &format!("{}/transmux/{current_date_path}/{job_id_c}", app.app)), + "Invalid transmux_s3_uri" + ); + Some(RecordConvertOutputLocation::S3(s3)) + } + } else { + None + }; + tokio::spawn(async move { log::info!("Convert job {job_id_c} started"); hook.on_event( @@ -210,16 +240,9 @@ impl HttpApis { }, ); - // get yyyy/mm/dd with chrono - let current_date_path = chrono::Utc::now().format("%Y/%m/%d").to_string(); let converter = RecordConvert::new(RecordConvertConfig { in_s3: input_s3, - transmux: body.transmux.map(|t| { - let uri = t - .custom_s3 - .unwrap_or_else(|| format!("{transmux_s3_uri}/{}/transmux/{current_date_path}/{job_id_c}?path_style=true", app.app)); - RecordConvertOutputLocation::S3(uri) - }), + transmux, compose: body.compose.map(|c| { let (uri, relative) = c .custom_s3 @@ -228,7 +251,6 @@ impl HttpApis { (u, relative) }) .unwrap_or_else(|| { - let compose_s3_uri = format!("{compose_s3_uri}?path_style=true"); let (s3, credentials, s3_sub_folder) = convert_s3_uri(&compose_s3_uri).expect("should convert compose_s3_uri"); let relative = format!("{}/compose/{current_date_path}/{job_id_c}.webm", app.app); let path = PathBuf::from(s3_sub_folder).join(&relative); @@ -281,3 +303,26 @@ impl HttpApis { }) } } + +fn concat_s3_uri_path(s3_uri: &str, path: &str) -> Option { + fn ensure_last_slash(s: String) -> String { + if s.ends_with('/') { + s + } else { + s + "/" + } + } + + let parts = s3_uri.split('?').collect::>(); + if parts.len() == 2 { + let first = PathBuf::from(parts[0]).join(path).to_str()?.to_string(); + let first = ensure_last_slash(first); + log::info!("first: {}", first); + Some(first + "?" + parts[1]) + } else { + let first = PathBuf::from(s3_uri).join(path).to_str()?.to_string(); + let first = ensure_last_slash(first); + log::info!("first: {}", first); + Some(first) + } +} diff --git a/packages/media_record/bin/run_worker.sh b/packages/media_record/bin/run_worker.sh deleted file mode 100644 index f714bb03..00000000 --- a/packages/media_record/bin/run_worker.sh +++ /dev/null @@ -1,4 +0,0 @@ -cargo run --bin convert_record_worker -- \ - --input-s3-uri http://ows-storage:tzraxj7wptoh@storage.dev.owslab.io/atm0s-record \ - --multi-tenancy-sync https://atm0s.wiremockapi.cloud/cp/apps \ - --secret zsz94nsrj3xvmbu555nmu25hwqo6shiq \ No newline at end of file diff --git a/packages/media_record/src/convert/composer.rs b/packages/media_record/src/convert/composer.rs index 864bbd22..76e4b761 100644 --- a/packages/media_record/src/convert/composer.rs +++ b/packages/media_record/src/convert/composer.rs @@ -122,9 +122,11 @@ impl RecordComposer { let room_reader = RoomReader::new(s3, credentials, &s3_sub_folder); let peers = room_reader.peers().await.map_err(|e| e.to_string())?; + log::info!("check room peers {:?}", peers.iter().map(|p| p.peer()).collect::>()); //we use channel to wait all sessions for peer in peers { let sessions = peer.sessions().await.map_err(|e| e.to_string())?; + log::info!("check peer {} sessions {:?}", peer.peer(), sessions.iter().map(|s| s.id()).collect::>()); for mut session in sessions { session.connect().await.map_err(|e| e.to_string())?; let id = session.id(); diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index d2707a10..3687e50a 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -49,7 +49,8 @@ pub struct MediaConfig { pub ice_lite: bool, pub webrtc_addrs: Vec, pub webrtc_addrs_alt: Vec, - pub rtpengine_rtp_ip: IpAddr, + pub rtpengine_listen_ip: IpAddr, + pub rtpengine_public_ip: IpAddr, pub secure: Arc, pub max_live: HashMap, pub enable_gateway_agent: bool, @@ -220,7 +221,7 @@ impl MediaServerWorker { MediaWorkerWebrtc::new(media.webrtc_addrs, media.webrtc_addrs_alt, media.ice_lite, media.secure.clone()), TaskType::MediaWebrtc, ), - media_rtpengine: TaskSwitcherBranch::new(MediaWorkerRtpEngine::new(media.rtpengine_rtp_ip), TaskType::MediaRtpEngine), + media_rtpengine: TaskSwitcherBranch::new(MediaWorkerRtpEngine::new(media.rtpengine_listen_ip, media.rtpengine_public_ip), TaskType::MediaRtpEngine), media_max_live, switcher: TaskSwitcher::new(4), queue, diff --git a/packages/media_utils/Cargo.toml b/packages/media_utils/Cargo.toml index 1e89bc0a..68c11852 100644 --- a/packages/media_utils/Cargo.toml +++ b/packages/media_utils/Cargo.toml @@ -15,6 +15,7 @@ serde-querystring = "0.2" pin-project-lite = "0.2" spin = { workspace = true } once_cell = "1.20" +urlencoding = "2.1" derive_more = { version = "1.0", features = ["full"] } [dev-dependencies] @@ -22,4 +23,4 @@ criterion = { version = "0.5", features = ["html_reports"] } [[bench]] name = "map_bench" -harness = false \ No newline at end of file +harness = false diff --git a/packages/media_utils/src/uri.rs b/packages/media_utils/src/uri.rs index 27ec37d7..3fc98fdf 100644 --- a/packages/media_utils/src/uri.rs +++ b/packages/media_utils/src/uri.rs @@ -1,10 +1,12 @@ use serde::de::DeserializeOwned; use serde_querystring::DuplicateQS; +#[derive(Debug, Clone)] pub struct CustomUri { pub username: Option, pub password: Option, pub endpoint: String, + pub host: String, pub path: Vec, pub query: Q, } @@ -29,10 +31,14 @@ impl TryFrom<&str> for CustomUri { (false, Some(port)) => format!("http://{}:{}", host, port), }; + let username = uri.username().and_then(|u| urlencoding::decode(u.as_ref()).map(|u| u.to_string()).ok()); + let password = uri.password().and_then(|u| urlencoding::decode(u.as_ref()).map(|u| u.to_string()).ok()); + Ok(Self { - username: uri.username().map(|u| u.to_string()), - password: uri.password().map(|u| u.to_string()), + username: username.map(|u| u.to_string()), + password: password.map(|u| u.to_string()), endpoint, + host: host.to_string(), path, query, }) diff --git a/packages/transport_rtpengine/src/transport.rs b/packages/transport_rtpengine/src/transport.rs index 2eff1fc8..5fa4a166 100644 --- a/packages/transport_rtpengine/src/transport.rs +++ b/packages/transport_rtpengine/src/transport.rs @@ -66,10 +66,10 @@ pub struct TransportRtpEngine { } impl TransportRtpEngine { - pub fn new_offer(room: RoomId, peer: PeerId, ip: IpAddr) -> Result<(Self, String), String> { - let socket = std::net::UdpSocket::bind(SocketAddr::new(ip, 0)).map_err(|e| e.to_string())?; + pub fn new_offer(room: RoomId, peer: PeerId, public_ip: IpAddr, listen_ip: IpAddr) -> Result<(Self, String), String> { + let socket = std::net::UdpSocket::bind(SocketAddr::new(listen_ip, 0)).map_err(|e| e.to_string())?; let port = socket.local_addr().map_err(|e| e.to_string())?.port(); - let answer = sdp_builder(ip, port); + let answer = sdp_builder(public_ip, port); Ok(( Self { @@ -85,7 +85,7 @@ impl TransportRtpEngine { last_send_rtp: None, queue: DynamicDeque::from([ TransportOutput::Net(BackendOutgoing::UdpListen { - addr: SocketAddr::new(ip, port), + addr: SocketAddr::new(listen_ip, port), reuse: false, }), TransportOutput::Event(TransportEvent::State(TransportState::New)), @@ -99,7 +99,7 @@ impl TransportRtpEngine { )) } - pub fn new_answer(room: RoomId, peer: PeerId, ip: IpAddr, offer: &str) -> Result<(Self, String), String> { + pub fn new_answer(room: RoomId, peer: PeerId, public_ip: IpAddr, listen_ip: IpAddr, offer: &str) -> Result<(Self, String), String> { let mut offer = SessionDescription::try_from(offer.to_string()).map_err(|e| e.to_string())?; let dest_ip: IpAddr = if let Some(conn) = offer.connection { conn.connection_address.base @@ -111,9 +111,9 @@ impl TransportRtpEngine { log::info!("[TransportRtpEngine] on create answer => set remote to {remote}"); - let socket = std::net::UdpSocket::bind(SocketAddr::new(ip, 0)).map_err(|e| e.to_string())?; + let socket = std::net::UdpSocket::bind(SocketAddr::new(listen_ip, 0)).map_err(|e| e.to_string())?; let port = socket.local_addr().map_err(|e| e.to_string())?.port(); - let answer = sdp_builder(ip, port); + let answer = sdp_builder(public_ip, port); Ok(( Self { @@ -129,7 +129,7 @@ impl TransportRtpEngine { last_send_rtp: None, queue: DynamicDeque::from([ TransportOutput::Net(BackendOutgoing::UdpListen { - addr: SocketAddr::new(ip, port), + addr: SocketAddr::new(listen_ip, port), reuse: false, }), TransportOutput::Event(TransportEvent::State(TransportState::Connecting(dest_ip))), diff --git a/packages/transport_rtpengine/src/worker.rs b/packages/transport_rtpengine/src/worker.rs index 53ab5d1f..015614cf 100644 --- a/packages/transport_rtpengine/src/worker.rs +++ b/packages/transport_rtpengine/src/worker.rs @@ -40,16 +40,18 @@ pub enum GroupOutput { #[allow(clippy::type_complexity)] pub struct MediaWorkerRtpEngine { - ip: IpAddr, + listen_ip: IpAddr, + public_ip: IpAddr, endpoints: TaskGroup, EndpointOutput, Endpoint, 16>, queue: VecDeque, shutdown: bool, } impl MediaWorkerRtpEngine { - pub fn new(ip: IpAddr) -> Self { + pub fn new(listen_ip: IpAddr, public_ip: IpAddr) -> Self { Self { - ip, + listen_ip, + public_ip, endpoints: TaskGroup::default(), queue: VecDeque::new(), shutdown: false, @@ -58,9 +60,9 @@ impl MediaWorkerRtpEngine { pub fn spawn(&mut self, app: AppContext, room: RoomId, peer: PeerId, record: bool, session_id: u64, offer: Option<&str>) -> RpcResult<(usize, String)> { let (tran, answer) = if let Some(offer) = offer { - TransportRtpEngine::new_answer(room, peer, self.ip, offer).map_err(|e| RpcError::new(1000_u32, &e))? + TransportRtpEngine::new_answer(room, peer, self.public_ip, self.listen_ip, offer).map_err(|e| RpcError::new(1000_u32, &e))? } else { - TransportRtpEngine::new_offer(room, peer, self.ip).map_err(|e| RpcError::new(1000_u32, &e))? + TransportRtpEngine::new_offer(room, peer, self.public_ip, self.listen_ip).map_err(|e| RpcError::new(1000_u32, &e))? }; let cfg = EndpointCfg { app,