From 61ef5dbdebf22a803d5d7a93d044e506e759497e Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 12:01:39 +0700 Subject: [PATCH 01/14] feat: SDN discovery over node http-api --- Cargo.lock | 1 + bin/Cargo.toml | 1 + bin/src/http.rs | 49 ++++++++++++++++++++++++++++++++------- bin/src/http/api_node.rs | 29 +++++++++++++++++++++++ bin/src/lib.rs | 13 ++++++++++- bin/src/main.rs | 17 ++++++++++++-- bin/src/server/console.rs | 5 ++-- bin/src/server/gateway.rs | 28 +++++++++++----------- bin/src/server/media.rs | 8 ++++--- bin/z0_connector_n4.sh | 2 +- bin/z0_gate_n1.sh | 2 +- bin/z0_media_n2.sh | 5 ++-- bin/z0_media_n3.sh | 5 ++-- 13 files changed, 128 insertions(+), 37 deletions(-) create mode 100644 bin/src/http/api_node.rs diff --git a/Cargo.lock b/Cargo.lock index aab95f4d..4c9a753f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,6 +489,7 @@ dependencies = [ "quinn", "rand 0.8.5", "rcgen", + "reqwest", "rtpengine-ngcontrol", "rust-embed", "rustls", diff --git a/bin/Cargo.toml b/bin/Cargo.toml index fb75bcbc..3a35de4e 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -38,6 +38,7 @@ maxminddb = { version = "0.24", optional = true } sysinfo = { version = "0.31", optional = true } hex = { version = "0.4", optional = true } mime_guess = { version = "2.0", optional = true } +reqwest = { version = "0.12", features = ["json"]} sentry = "0.34" [features] diff --git a/bin/src/http.rs b/bin/src/http.rs index db43ad8c..f8f7d488 100644 --- a/bin/src/http.rs +++ b/bin/src/http.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; +pub use api_node::NodeApiCtx; use media_server_protocol::endpoint::ClusterConnId; #[cfg(feature = "console")] use media_server_protocol::protobuf::cluster_connector::MediaConnectorServiceClient; @@ -14,6 +15,7 @@ use poem::{listener::TcpListener, middleware::Cors, EndpointExt, Route, Server}; use poem_openapi::types::{ToJSON, Type}; use poem_openapi::OpenApiService; use poem_openapi::{types::ParseFromJSON, Object}; +use serde::Deserialize; use tokio::sync::mpsc::Sender; #[cfg(feature = "embed_static")] use utils::EmbeddedFilesEndpoint; @@ -21,6 +23,7 @@ use utils::EmbeddedFilesEndpoint; mod api_console; mod api_media; mod api_metrics; +mod api_node; mod api_token; mod utils; @@ -34,13 +37,13 @@ pub struct PublicMediaFiles; #[folder = "public/console"] pub struct PublicConsoleFiles; -#[derive(Debug, Default, Object)] +#[derive(Debug, Default, Object, Deserialize)] pub struct Pagination { pub total: usize, pub current: usize, } -#[derive(Debug, Object)] +#[derive(Debug, Object, Deserialize)] pub struct Response { pub status: bool, #[oai(skip_serializing_if = "Option::is_none")] @@ -65,25 +68,31 @@ impl Default for Response { #[cfg(feature = "console")] pub async fn run_console_http_server( port: u16, + node: NodeApiCtx, secure: media_server_secure::jwt::MediaConsoleSecureJwt, storage: crate::server::console_storage::StorageShared, connector: MediaConnectorServiceClient, ) -> Result<(), Box> { use poem::middleware::Tracing; - let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); - let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/"); + let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/"); let user_ui = user_service.swagger_ui(); let user_spec = user_service.spec(); - let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Console Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/"); + let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/"); let cluster_ui = cluster_service.swagger_ui(); let cluster_spec = cluster_service.spec(); - let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Console Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/"); + let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/"); let connector_ui = connector_service.swagger_ui(); let connector_spec = connector_service.spec(); @@ -96,6 +105,10 @@ pub async fn run_console_http_server( let route = Route::new() .nest("/", console_panel) + //node + .nest("/api/node/", node_service) + .nest("/api/node/ui", node_ui) + .at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone())) //metrics .nest("/api/metrics/", metrics_service) .nest("/api/metrics/ui", metrics_ui) @@ -122,6 +135,7 @@ pub async fn run_console_http_server( #[cfg(feature = "gateway")] pub async fn run_gateway_http_server( port: u16, + node: NodeApiCtx, sender: Sender, RpcRes>>, edge_secure: Arc, gateway_secure: Arc, @@ -130,7 +144,12 @@ pub async fn run_gateway_http_server = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); @@ -177,6 +196,10 @@ pub async fn run_gateway_http_server( port: u16, + node: NodeApiCtx, sender: Sender, RpcRes>>, edge_secure: Arc, gateway_secure: Option>, ) -> Result<(), Box> { let mut route = Route::new(); - let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); @@ -273,6 +302,10 @@ pub async fn run_media_http_server Self { + Self { ctx } + } +} + +#[OpenApi] +impl Apis { + #[oai(path = "/address", method = "get")] + async fn get_address(&self) -> Json> { + Json(Response { + status: true, + data: Some(self.ctx.address.clone()), + ..Default::default() + }) + } +} diff --git a/bin/src/lib.rs b/bin/src/lib.rs index 6eb8492e..f77ffa04 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, str::FromStr}; use atm0s_sdn::{NodeAddr, NodeId}; use media_server_protocol::cluster::ZoneId; @@ -22,3 +22,14 @@ pub struct NodeConfig { pub zone: ZoneId, pub bind_addrs_alt: Vec, } + +pub async fn fetch_node_addr_from_api(url: &str) -> Result { + let resp = reqwest::get(url).await.map_err(|e| e.to_string())?; + let node_addr = resp + .json::>() + .await + .map_err(|e| e.to_string())? + .data + .ok_or(format!("No data in response from {}", url))?; + NodeAddr::from_str(&node_addr).map_err(|e| e.to_string()) +} diff --git a/bin/src/main.rs b/bin/src/main.rs index 215ca325..dd211b82 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -1,6 +1,6 @@ use std::net::{IpAddr, SocketAddr}; -use atm0s_media_server::{server, NodeConfig}; +use atm0s_media_server::{fetch_node_addr_from_api, server, NodeConfig}; use atm0s_sdn::NodeAddr; use clap::Parser; use media_server_protocol::cluster::ZoneId; @@ -56,6 +56,12 @@ struct Args { #[arg(env, long)] seeds: Vec, + /// Seeds from API, this is used for auto-discovery of seeds. + /// It is very useful for cloud deployment. + /// Currently all of nodes expose /api/node/address endpoint, so we can get seeds from there. + #[arg(env, long)] + seeds_from_node_api: Option, + /// Number of worker threads to spawn. #[arg(env, long, default_value_t = 1)] workers: usize, @@ -118,7 +124,7 @@ async fn main() { .map(|(_name, ip)| SocketAddr::new(ip, sdn_port)) .collect::>() }; - let node = NodeConfig { + let mut node = NodeConfig { node_id: ZoneId(args.sdn_zone_id).to_node_id(args.sdn_zone_node_id), secret: args.secret, seeds: args.seeds, @@ -129,6 +135,13 @@ async fn main() { log::info!("Bind addrs {:?}, bind addrs alt {:?}", node.bind_addrs, node.bind_addrs_alt); + if let Some(seeds_from_node_api) = args.seeds_from_node_api { + log::info!("Generate seeds from node_api {}", seeds_from_node_api); + let addr = fetch_node_addr_from_api(&seeds_from_node_api).await.expect("should get seed"); + log::info!("Generated seed {:?}", addr); + node.seeds = vec![addr]; + } + let local = tokio::task::LocalSet::new(); local .run_until(async move { diff --git a/bin/src/server/console.rs b/bin/src/server/console.rs index ab9c7096..8d66ec85 100644 --- a/bin/src/server/console.rs +++ b/bin/src/server/console.rs @@ -11,7 +11,7 @@ use media_server_secure::jwt::MediaConsoleSecureJwt; use storage::StorageShared; use crate::{ - http::run_console_http_server, + http::{run_console_http_server, NodeApiCtx}, node_metrics::NodeMetricsCollector, quinn::{make_quinn_client, VirtualNetwork}, NodeConfig, @@ -72,8 +72,9 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No if let Some(http_port) = http_port { let secure = MediaConsoleSecureJwt::from(node.secret.as_bytes()); let storage = storage.clone(); + let node_ctx = NodeApiCtx { address: node_addr.to_string() }; tokio::spawn(async move { - if let Err(e) = run_console_http_server(http_port, secure, storage, connector_rpc_client).await { + if let Err(e) = run_console_http_server(http_port, node_ctx, secure, storage, connector_rpc_client).await { log::error!("HTTP Error: {}", e); } }); diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index f2f25207..08eae977 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -17,7 +17,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use std::net::SocketAddr; use crate::{ - http::run_gateway_http_server, + http::{run_gateway_http_server, NodeApiCtx}, ng_controller::NgControllerServer, node_metrics::NodeMetricsCollector, quinn::{make_quinn_client, make_quinn_server, VirtualNetwork}, @@ -114,17 +114,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let gateway_secure = Arc::new(gateway_secure); let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); - if let Some(http_port) = http_port { - let req_tx = req_tx.clone(); - let secure2 = edge_secure.clone(); - tokio::spawn(async move { - if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure).await { - log::error!("HTTP Error: {}", e); - } - }); - } - - //Running ng controller for Voip + // Running ng controller for Voip if let Some(ngproto_addr) = args.rtpengine_cmd_addr { let req_tx = req_tx.clone(); let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await; @@ -137,8 +127,8 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod }); } + // Setup Sdn let node_id = node.node_id; - let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.bind_addrs_alt); let node_addr = builder.node_addr(); let node_info = ClusterNodeInfo::Gateway( @@ -168,6 +158,18 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let mut controller = builder.build::>(workers, node_info); let (selector, mut requester) = build_dest_selector(); + // Setup HTTP server + if let Some(http_port) = http_port { + let req_tx = req_tx.clone(); + let secure2 = edge_secure.clone(); + let node_ctx = NodeApiCtx { address: node_addr.to_string() }; + tokio::spawn(async move { + if let Err(e) = run_gateway_http_server(http_port, node_ctx, req_tx, secure2, gateway_secure).await { + log::error!("HTTP Error: {}", e); + } + }); + } + // Ip location for routing client to closest gateway let ip2location = Arc::new(Ip2Location::new(&args.geo_db)); diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 4a92322c..86f22eee 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -5,7 +5,7 @@ use std::{ time::{Duration, Instant}, }; -use atm0s_sdn::{features::FeaturesEvent, SdnExtIn, SdnExtOut, TimePivot, TimeTicker}; +use atm0s_sdn::{features::FeaturesEvent, generate_node_addr, SdnExtIn, SdnExtOut, TimePivot, TimeTicker}; use clap::Parser; use media_server_gateway::ServiceKind; use media_server_multi_tenancy::MultiTenancyStorage; @@ -27,7 +27,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use sans_io_runtime::{backend::PollingBackend, Controller}; use crate::{ - http::run_media_http_server, + http::{run_media_http_server, NodeApiCtx}, ng_controller::NgControllerServer, node_metrics::NodeMetricsCollector, quinn::{make_quinn_server, VirtualNetwork}, @@ -92,6 +92,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node let secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes())); let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); + let node_addr = generate_node_addr(node.node_id, &node.bind_addrs, node.bind_addrs_alt.clone()); if let Some(http_port) = http_port { let secure_gateway = args.enable_token_api.then(|| { let app_storage = Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None)); @@ -99,8 +100,9 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node }); let req_tx = req_tx.clone(); let secure_edge = secure.clone(); + let node_ctx = NodeApiCtx { address: node_addr.to_string() }; tokio::spawn(async move { - if let Err(e) = run_media_http_server(http_port, req_tx, secure_edge, secure_gateway).await { + if let Err(e) = run_media_http_server(http_port, node_ctx, req_tx, secure_edge, secure_gateway).await { log::error!("HTTP Error: {}", e); } }); diff --git a/bin/z0_connector_n4.sh b/bin/z0_connector_n4.sh index 262308fa..b8001989 100644 --- a/bin/z0_connector_n4.sh +++ b/bin/z0_connector_n4.sh @@ -3,6 +3,6 @@ RUST_BACKTRACE=1 \ cargo run -- \ --sdn-zone-id 0 \ --sdn-zone-node-id 4 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-node-api "http://localhost:3000/api/node/address" \ connector \ --s3-uri "http://minioadmin:minioadmin@127.0.0.1:9000/record" diff --git a/bin/z0_gate_n1.sh b/bin/z0_gate_n1.sh index 22ad93c2..2fec78f8 100644 --- a/bin/z0_gate_n1.sh +++ b/bin/z0_gate_n1.sh @@ -6,7 +6,7 @@ cargo run -- \ --sdn-port 10001 \ --sdn-zone-id 0 \ --sdn-zone-node-id 1 \ - --seeds 0@/ip4/127.0.0.1/udp/10000 \ + --seeds-from-node-api "http://localhost:8080/api/node/address" \ --workers 2 \ gateway \ --lat 10 \ diff --git a/bin/z0_media_n2.sh b/bin/z0_media_n2.sh index 566b4fdb..18db3e71 100644 --- a/bin/z0_media_n2.sh +++ b/bin/z0_media_n2.sh @@ -4,7 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 2 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-node-api "http://localhost:3000/api/node/address" \ --workers 2 \ - media \ - --enable-token-api + media diff --git a/bin/z0_media_n3.sh b/bin/z0_media_n3.sh index bdda16fa..499a8a67 100644 --- a/bin/z0_media_n3.sh +++ b/bin/z0_media_n3.sh @@ -4,7 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 3 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-node-api "http://localhost:3000/api/node/address" \ --workers 2 \ - media \ - --enable-token-api + media From 4d7b0d0dee82dcd6fb68dbc9c507a8abef142d14 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 12:11:50 +0700 Subject: [PATCH 02/14] added docs --- docs/getting-started/installation/README.md | 2 + .../installation/network-discovery.md | 54 +++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 docs/getting-started/installation/network-discovery.md diff --git a/docs/getting-started/installation/README.md b/docs/getting-started/installation/README.md index 6d3d01e7..72cf4f1f 100644 --- a/docs/getting-started/installation/README.md +++ b/docs/getting-started/installation/README.md @@ -35,3 +35,5 @@ Or you can use some tools to deploy atm0s-media-server: - [Kubernetes](./kubernetes.md) - [Docker Compose](./docker-compose.md) + +About network discovery, please refer to [Network Discovery](./network-discovery.md) for more details with your own use-case. diff --git a/docs/getting-started/installation/network-discovery.md b/docs/getting-started/installation/network-discovery.md new file mode 100644 index 00000000..db26aa87 --- /dev/null +++ b/docs/getting-started/installation/network-discovery.md @@ -0,0 +1,54 @@ +# Network Discovery + +We have two ways to discover other nodes in the network: + +- Manually specify the seeds +- Query the node API + +## Manually specify the seeds + +Each time we start the node, we can manually specify the seeds. + +``` +cargo run -- --sdn-zone-id 0 --sdn-zone-node-id 1 --seeds 1@/ip4/127.0.0.1/udp/10001 media +``` + +This way is simple and easy to understand, but it's not flexible and is inconvenient to manage. + +## Query the node API + +We can use the node API to get the addresses of other nodes and then start the node with those addresses. + +``` +cargo run -- --sdn-zone-id 0 --sdn-zone-node-id 1 --seeds-from-node-api "http://localhost:3000/api/node/address" media +``` + +This way is flexible and convenient to manage, and we can also use it to dynamically get the addresses of other nodes. +A common use case is when deploying with docker-compose or kubernetes - we only need to set up the loadbalancer to point to the HTTP API of nodes, then use the API to provide addresses to other nodes. + +For example, we might have a loadbalancer config like this: + +| Zone | Node Type | Address | +| ---- | --------- | -------------------------------- | +| 0 | Console | http://console.atm0s.cloud | +| 0 | Gateway | http://gateway.zone0.atm0s.cloud | +| 1 | Gateway | http://gateway.zone1.atm0s.cloud | +| 2 | Gateway | http://gateway.zone2.atm0s.cloud | + +``` +http://console.atm0s.cloud/api/node/address +http://gateway.zone1.atm0s.cloud/api/node/address +http://gateway.zone2.atm0s.cloud/api/node/address +``` + +Then we can start nodes with config like this: + +| Zone | Node Type | Seeds From API | +| ---- | --------- | ------------------------------------------------- | +| 0 | Gateway | http://console.atm0s.cloud/api/node/address | +| 0 | Media | http://gateway.zone0.atm0s.cloud/api/node/address | +| 1 | Gateway | http://console.atm0s.cloud/api/node/address | +| 1 | Media | http://gateway.zone1.atm0s.cloud/api/node/address | +| 2 | Gateway | http://console.atm0s.cloud/api/node/address | +| 2 | Media | http://gateway.zone2.atm0s.cloud/api/node/address | + From ec6b19cdfdde8f04e3f7a13fae1e8ccd68796f3c Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 13:09:59 +0700 Subject: [PATCH 03/14] make it simpler --- bin/src/lib.rs | 2 +- bin/src/main.rs | 1 - bin/z0_connector_n4.sh | 2 +- bin/z0_gate_n1.sh | 2 +- bin/z0_media_n2.sh | 2 +- bin/z0_media_n3.sh | 2 +- .../installation/network-discovery.md | 24 +++++++------------ 7 files changed, 14 insertions(+), 21 deletions(-) diff --git a/bin/src/lib.rs b/bin/src/lib.rs index f77ffa04..02dcac4e 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -24,7 +24,7 @@ pub struct NodeConfig { } pub async fn fetch_node_addr_from_api(url: &str) -> Result { - let resp = reqwest::get(url).await.map_err(|e| e.to_string())?; + let resp = reqwest::get(format!("{}/api/node/address", url)).await.map_err(|e| e.to_string())?; let node_addr = resp .json::>() .await diff --git a/bin/src/main.rs b/bin/src/main.rs index dd211b82..a1586112 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -57,7 +57,6 @@ struct Args { seeds: Vec, /// Seeds from API, this is used for auto-discovery of seeds. - /// It is very useful for cloud deployment. /// Currently all of nodes expose /api/node/address endpoint, so we can get seeds from there. #[arg(env, long)] seeds_from_node_api: Option, diff --git a/bin/z0_connector_n4.sh b/bin/z0_connector_n4.sh index b8001989..ab670b1b 100644 --- a/bin/z0_connector_n4.sh +++ b/bin/z0_connector_n4.sh @@ -3,6 +3,6 @@ RUST_BACKTRACE=1 \ cargo run -- \ --sdn-zone-id 0 \ --sdn-zone-node-id 4 \ - --seeds-from-node-api "http://localhost:3000/api/node/address" \ + --seeds-from-node-api "http://localhost:3000" \ connector \ --s3-uri "http://minioadmin:minioadmin@127.0.0.1:9000/record" diff --git a/bin/z0_gate_n1.sh b/bin/z0_gate_n1.sh index 2fec78f8..2e82a2b7 100644 --- a/bin/z0_gate_n1.sh +++ b/bin/z0_gate_n1.sh @@ -6,7 +6,7 @@ cargo run -- \ --sdn-port 10001 \ --sdn-zone-id 0 \ --sdn-zone-node-id 1 \ - --seeds-from-node-api "http://localhost:8080/api/node/address" \ + --seeds-from-node-api "http://localhost:8080" \ --workers 2 \ gateway \ --lat 10 \ diff --git a/bin/z0_media_n2.sh b/bin/z0_media_n2.sh index 18db3e71..86235d1b 100644 --- a/bin/z0_media_n2.sh +++ b/bin/z0_media_n2.sh @@ -4,6 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 2 \ - --seeds-from-node-api "http://localhost:3000/api/node/address" \ + --seeds-from-node-api "http://localhost:3000" \ --workers 2 \ media diff --git a/bin/z0_media_n3.sh b/bin/z0_media_n3.sh index 499a8a67..426a45ca 100644 --- a/bin/z0_media_n3.sh +++ b/bin/z0_media_n3.sh @@ -4,6 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 3 \ - --seeds-from-node-api "http://localhost:3000/api/node/address" \ + --seeds-from-node-api "http://localhost:3000" \ --workers 2 \ media diff --git a/docs/getting-started/installation/network-discovery.md b/docs/getting-started/installation/network-discovery.md index db26aa87..1cdced45 100644 --- a/docs/getting-started/installation/network-discovery.md +++ b/docs/getting-started/installation/network-discovery.md @@ -20,7 +20,7 @@ This way is simple and easy to understand, but it's not flexible and is inconven We can use the node API to get the addresses of other nodes and then start the node with those addresses. ``` -cargo run -- --sdn-zone-id 0 --sdn-zone-node-id 1 --seeds-from-node-api "http://localhost:3000/api/node/address" media +cargo run -- --sdn-zone-id 0 --sdn-zone-node-id 1 --seeds-from-node-api "http://localhost:3000" media ``` This way is flexible and convenient to manage, and we can also use it to dynamically get the addresses of other nodes. @@ -35,20 +35,14 @@ For example, we might have a loadbalancer config like this: | 1 | Gateway | http://gateway.zone1.atm0s.cloud | | 2 | Gateway | http://gateway.zone2.atm0s.cloud | -``` -http://console.atm0s.cloud/api/node/address -http://gateway.zone1.atm0s.cloud/api/node/address -http://gateway.zone2.atm0s.cloud/api/node/address -``` - Then we can start nodes with config like this: -| Zone | Node Type | Seeds From API | -| ---- | --------- | ------------------------------------------------- | -| 0 | Gateway | http://console.atm0s.cloud/api/node/address | -| 0 | Media | http://gateway.zone0.atm0s.cloud/api/node/address | -| 1 | Gateway | http://console.atm0s.cloud/api/node/address | -| 1 | Media | http://gateway.zone1.atm0s.cloud/api/node/address | -| 2 | Gateway | http://console.atm0s.cloud/api/node/address | -| 2 | Media | http://gateway.zone2.atm0s.cloud/api/node/address | +| Zone | Node Type | Seeds From API | +| ---- | --------- | -------------------------------- | +| 0 | Gateway | http://console.atm0s.cloud | +| 0 | Media | http://gateway.zone0.atm0s.cloud | +| 1 | Gateway | http://console.atm0s.cloud | +| 1 | Media | http://gateway.zone1.atm0s.cloud | +| 2 | Gateway | http://console.atm0s.cloud | +| 2 | Media | http://gateway.zone2.atm0s.cloud | From 9f6bdca54f34d957f72b7d2b050167389e200702 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 14:03:42 +0700 Subject: [PATCH 04/14] feat: generate node-id from local_io --- bin/src/main.rs | 20 ++++++++++++++++- docs/getting-started/installation/README.md | 1 + .../installation/auto-generate-node-id.md | 22 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 docs/getting-started/installation/auto-generate-node-id.md diff --git a/bin/src/main.rs b/bin/src/main.rs index a1586112..fd027a4c 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -32,6 +32,11 @@ struct Args { #[arg(env, long, default_value_t = 0)] sdn_zone_node_id: u8, + /// Auto generate node_id from last 8 bits of local_ip which match prefix + /// Example: 192.168.1, or 10.10.10. + #[arg(env, long)] + sdn_zone_node_id_from_ip_prefix: Option, + /// Manually specify the IP address of the node. This disables IP autodetection. #[arg(env, long)] node_ip: Option, @@ -107,6 +112,19 @@ async fn main() { let workers = args.workers; + let mut auto_generated_node_id = None; + if let Some(ip_prefix) = args.sdn_zone_node_id_from_ip_prefix { + for (name, ip) in local_ip_address::list_afinet_netifas().expect("Should have list interfaces") { + if let IpAddr::V4(ipv4) = ip { + if ipv4.to_string().starts_with(&ip_prefix) { + auto_generated_node_id = Some(ipv4.octets()[3]); + log::info!("Found ip prefix {ip_prefix} on {name} with ip {ip} => auto generate sdn_zone_node_id with {}", ipv4.octets()[3]); + break; + } + } + } + } + let bind_addrs = if let Some(ip) = args.node_ip { vec![SocketAddr::new(ip, sdn_port)] } else { @@ -124,7 +142,7 @@ async fn main() { .collect::>() }; let mut node = NodeConfig { - node_id: ZoneId(args.sdn_zone_id).to_node_id(args.sdn_zone_node_id), + node_id: ZoneId(args.sdn_zone_id).to_node_id(auto_generated_node_id.unwrap_or(args.sdn_zone_node_id)), secret: args.secret, seeds: args.seeds, bind_addrs, diff --git a/docs/getting-started/installation/README.md b/docs/getting-started/installation/README.md index 72cf4f1f..054003e9 100644 --- a/docs/getting-started/installation/README.md +++ b/docs/getting-started/installation/README.md @@ -37,3 +37,4 @@ Or you can use some tools to deploy atm0s-media-server: - [Docker Compose](./docker-compose.md) About network discovery, please refer to [Network Discovery](./network-discovery.md) for more details with your own use-case. +About some mechanism to generate node_id without manually specify, please refer to [Auto generate node_id](./auto-generate-node-id.md) for more details. diff --git a/docs/getting-started/installation/auto-generate-node-id.md b/docs/getting-started/installation/auto-generate-node-id.md new file mode 100644 index 00000000..ab0699d4 --- /dev/null +++ b/docs/getting-started/installation/auto-generate-node-id.md @@ -0,0 +1,22 @@ +# Auto generate node_id + +Most of the time, we rarely deploy new zones, so the zone-id can be manually specified. However, for nodes inside a zone, we usually use cloud or docker for easy management. This leads to the problem of having to manually specify node_ids inside a zone, which is inconvenient and error-prone. + +So we have implemented several mechanisms to auto-generate node_ids from machine information. + +## Auto generate node_id from local_ip + +The idea is simple: most cloud providers or bare-metal servers will assign a unique private IP to each machine, typically in the form of 10.10.10.x, 192.168.1.x, etc. + +We can use the last octet of the IP as the node_id. + +If the subnet is larger than /24, we still use the last 8 bits of the IP as the node_id, though this carries some risk of collision. In such cases, we recommend switching to a /24 subnet or using the NodeId pool. + +Example: +``` +cargo run -- --sdn-zone-id 0 --sdn-zone-node-id-from-ip-prefix "10.10.10" console +``` + +## NodeId pool + +Status: in progress \ No newline at end of file From f16a6e07584a2d792e9077952519a4a6213cb1fa Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 14:40:43 +0700 Subject: [PATCH 05/14] feat: nat-traversal by detect public_ip from cloud metadata --- bin/src/lib.rs | 40 ++++++++++++++++++- bin/src/main.rs | 16 +++++++- .../installation/nat-traversal.md | 20 ++++++++++ 3 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 docs/getting-started/installation/nat-traversal.md diff --git a/bin/src/lib.rs b/bin/src/lib.rs index 02dcac4e..c0d7acdf 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -1,6 +1,10 @@ -use std::{net::SocketAddr, str::FromStr}; +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, +}; use atm0s_sdn::{NodeAddr, NodeId}; +use clap::ValueEnum; use media_server_protocol::cluster::ZoneId; mod errors; @@ -33,3 +37,37 @@ pub async fn fetch_node_addr_from_api(url: &str) -> Result { .ok_or(format!("No data in response from {}", url))?; NodeAddr::from_str(&node_addr).map_err(|e| e.to_string()) } + +#[derive(Debug, Clone, ValueEnum)] +pub enum CloudProvider { + Aws, + Gcp, + Azure, + Other, +} + +pub async fn fetch_node_ip_alt_from_cloud(cloud: CloudProvider) -> Result { + match cloud { + CloudProvider::Aws => { + let resp = reqwest::get("http://169.254.169.254/latest/meta-data/local-ipv4").await.map_err(|e| e.to_string())?; + let ip = resp.text().await.map_err(|e| e.to_string())?; + IpAddr::from_str(&ip).map_err(|e| e.to_string()) + } + CloudProvider::Gcp => { + let client = reqwest::Client::new(); + let resp = client + .get("http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip") + .header("Metadata-Flavor", "Google") + .send() + .await + .map_err(|e| e.to_string())?; + let ip = resp.text().await.map_err(|e| e.to_string())?; + IpAddr::from_str(&ip).map_err(|e| e.to_string()) + } + CloudProvider::Azure | CloudProvider::Other => { + let resp = reqwest::get("http://ipv4.icanhazip.com").await.map_err(|e| e.to_string())?; + let ip = resp.text().await.map_err(|e| e.to_string())?; + IpAddr::from_str(&ip).map_err(|e| e.to_string()) + } + } +} diff --git a/bin/src/main.rs b/bin/src/main.rs index fd027a4c..5350c4e4 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -1,6 +1,7 @@ use std::net::{IpAddr, SocketAddr}; use atm0s_media_server::{fetch_node_addr_from_api, server, NodeConfig}; +use atm0s_media_server::{fetch_node_ip_alt_from_cloud, CloudProvider}; use atm0s_sdn::NodeAddr; use clap::Parser; use media_server_protocol::cluster::ZoneId; @@ -45,6 +46,10 @@ struct Args { #[arg(env, long)] node_ip_alt: Vec, + /// Auto detect node_ip_alt with some common cloud provider metadata. + #[arg(env, long)] + node_ip_alt_cloud: Option, + /// Enable private IP addresses for the node. #[arg(env, long)] enable_private_ip: bool, @@ -125,6 +130,11 @@ async fn main() { } } + let mut node_ip_alt_cloud = vec![]; + if let Some(cloud) = args.node_ip_alt_cloud { + node_ip_alt_cloud.push(fetch_node_ip_alt_from_cloud(cloud).await.expect("should get node ip alt")); + } + let bind_addrs = if let Some(ip) = args.node_ip { vec![SocketAddr::new(ip, sdn_port)] } else { @@ -147,7 +157,11 @@ async fn main() { seeds: args.seeds, bind_addrs, zone: ZoneId(args.sdn_zone_id), - bind_addrs_alt: args.node_ip_alt.into_iter().map(|ip| SocketAddr::new(ip, sdn_port)).collect::>(), + bind_addrs_alt: node_ip_alt_cloud + .into_iter() + .chain(args.node_ip_alt.into_iter()) + .map(|ip| SocketAddr::new(ip, sdn_port)) + .collect::>(), }; log::info!("Bind addrs {:?}, bind addrs alt {:?}", node.bind_addrs, node.bind_addrs_alt); diff --git a/docs/getting-started/installation/nat-traversal.md b/docs/getting-started/installation/nat-traversal.md new file mode 100644 index 00000000..5e3c5a37 --- /dev/null +++ b/docs/getting-started/installation/nat-traversal.md @@ -0,0 +1,20 @@ +# NAT Traversal + +Some cloud providers (like AWS) route all traffic through a NAT gateway, which means that we don't have a public IP address on the VM. + +To work around this, we can use the `node_ip_alt` and `node_ip_alt_cloud` options to specify alternative IP addresses for the node. + +## Using node_ip_alt + +The `node_ip_alt` option takes a list of IP addresses as input, and the node will bind to each of them. + +## Using node_ip_alt_cloud + +The `node_ip_alt_cloud` option takes a cloud provider as input, and will automatically fetch the alternative IP address for the node. + +| Cloud Provider | Fetch URL | +| -------------- | ----------------------------------------------------------------------------------------------- | +| AWS | `http://169.254.169.254/latest/meta-data/local-ipv4` | +| GCP | `http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip` | +| Azure | `http://ipv4.icanhazip.com` | +| Other | `http://ipv4.icanhazip.com` | From 58b6a8d4e816d0d7464683593a335429163ceba6 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 15:19:25 +0700 Subject: [PATCH 06/14] fix aws public-ip --- bin/src/lib.rs | 2 +- bin/src/main.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/bin/src/lib.rs b/bin/src/lib.rs index c0d7acdf..cf1f652b 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -49,7 +49,7 @@ pub enum CloudProvider { pub async fn fetch_node_ip_alt_from_cloud(cloud: CloudProvider) -> Result { match cloud { CloudProvider::Aws => { - let resp = reqwest::get("http://169.254.169.254/latest/meta-data/local-ipv4").await.map_err(|e| e.to_string())?; + let resp = reqwest::get("http://169.254.169.254/latest/meta-data/public-ipv4").await.map_err(|e| e.to_string())?; let ip = resp.text().await.map_err(|e| e.to_string())?; IpAddr::from_str(&ip).map_err(|e| e.to_string()) } diff --git a/bin/src/main.rs b/bin/src/main.rs index 5350c4e4..773588c1 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -132,7 +132,10 @@ async fn main() { let mut node_ip_alt_cloud = vec![]; if let Some(cloud) = args.node_ip_alt_cloud { - node_ip_alt_cloud.push(fetch_node_ip_alt_from_cloud(cloud).await.expect("should get node ip alt")); + log::info!("Fetch public ip from cloud provider {:?}", cloud); + let public_ip = fetch_node_ip_alt_from_cloud(cloud).await.expect("should get node ip alt"); + log::info!("Fetched public ip {:?}", public_ip); + node_ip_alt_cloud.push(public_ip); } let bind_addrs = if let Some(ip) = args.node_ip { From 55bb969ea13c02316ca2980a984663e69ba9e26a Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 15:21:04 +0700 Subject: [PATCH 07/14] fix ip parse --- bin/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/src/lib.rs b/bin/src/lib.rs index cf1f652b..63b17f78 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -51,7 +51,7 @@ pub async fn fetch_node_ip_alt_from_cloud(cloud: CloudProvider) -> Result { let resp = reqwest::get("http://169.254.169.254/latest/meta-data/public-ipv4").await.map_err(|e| e.to_string())?; let ip = resp.text().await.map_err(|e| e.to_string())?; - IpAddr::from_str(&ip).map_err(|e| e.to_string()) + IpAddr::from_str(ip.trim()).map_err(|e| e.to_string()) } CloudProvider::Gcp => { let client = reqwest::Client::new(); @@ -62,12 +62,12 @@ pub async fn fetch_node_ip_alt_from_cloud(cloud: CloudProvider) -> Result { let resp = reqwest::get("http://ipv4.icanhazip.com").await.map_err(|e| e.to_string())?; let ip = resp.text().await.map_err(|e| e.to_string())?; - IpAddr::from_str(&ip).map_err(|e| e.to_string()) + IpAddr::from_str(ip.trim()).map_err(|e| e.to_string()) } } } From 05fcbd0b398a46b47a225f0a971619b8f8d4780d Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 17:57:50 +0700 Subject: [PATCH 08/14] setting rtpengine public-ip --- bin/src/server/media.rs | 14 ++++++++++++-- bin/src/server/standalone.rs | 6 +++--- packages/media_runner/src/worker.rs | 5 +++-- packages/transport_rtpengine/src/transport.rs | 16 ++++++++-------- packages/transport_rtpengine/src/worker.rs | 12 +++++++----- 5 files changed, 33 insertions(+), 20 deletions(-) diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index bb17bb2e..392ea39e 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -63,7 +63,7 @@ pub struct Args { /// The IP address for RTPengine RTP listening. /// Default: 127.0.0.1 #[arg(env, long, default_value = "127.0.0.1")] - pub rtpengine_rtp_ip: IpAddr, + pub rtpengine_listen_ip: IpAddr, /// Maximum concurrent connections per CPU core. #[arg(env, long, default_value_t = 200)] @@ -141,6 +141,15 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node }; let webrtc_addrs = node.bind_addrs.iter().map(|addr| SocketAddr::new(addr.ip(), webrtc_port)).collect::>(); let webrtc_addrs_alt = node.bind_addrs_alt.iter().map(|addr| SocketAddr::new(addr.ip(), webrtc_port)).collect::>(); + let rtpengine_public_ip = webrtc_addrs + .iter() + .chain(webrtc_addrs_alt.iter()) + .find(|addr| match addr.ip() { + IpAddr::V4(ipv4) => !ipv4.is_unspecified() && !ipv4.is_multicast() && !ipv4.is_loopback() && !ipv4.is_broadcast() && !ipv4.is_private(), + IpAddr::V6(ipv6) => !ipv6.is_unspecified() && !ipv6.is_multicast() && !ipv6.is_loopback(), + }) + .map(|addr| addr.ip()) + .unwrap_or(args.rtpengine_listen_ip); println!("Running media server worker {i} with addrs: {:?}, ice-lite: {}", webrtc_addrs, args.ice_lite); @@ -151,7 +160,8 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node media: MediaConfig { webrtc_addrs, webrtc_addrs_alt, - rtpengine_rtp_ip: args.rtpengine_rtp_ip, + rtpengine_listen_ip: args.rtpengine_listen_ip, + rtpengine_public_ip: rtpengine_public_ip, ice_lite: args.ice_lite, secure: secure.clone(), max_live: HashMap::from([(ServiceKind::Webrtc, workers as u32 * args.ccu_per_core), (ServiceKind::RtpEngine, workers as u32 * args.ccu_per_core)]), diff --git a/bin/src/server/standalone.rs b/bin/src/server/standalone.rs index f41bf784..9850527a 100644 --- a/bin/src/server/standalone.rs +++ b/bin/src/server/standalone.rs @@ -86,7 +86,7 @@ pub struct Args { /// The IP address for RTPengine RTP listening. /// Default: 127.0.0.1 #[arg(env, long, default_value = "127.0.0.1")] - pub rtpengine_rtp_ip: IpAddr, + pub rtpengine_listen_ip: IpAddr, /// Media instance count #[arg(env, long, default_value_t = 2)] @@ -208,7 +208,7 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) { let record_cache = args.record_cache.clone(); let record_mem_max_size = args.record_mem_max_size; let record_upload_worker = args.record_upload_worker; - let rtpengine_rtp_ip = args.rtpengine_rtp_ip; + let rtpengine_listen_ip = args.rtpengine_listen_ip; tokio::task::spawn_local(async move { super::run_media_server( workers, @@ -226,7 +226,7 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) { ice_lite: false, webrtc_port_seed: 0, rtpengine_cmd_addr: None, - rtpengine_rtp_ip, + rtpengine_listen_ip, ccu_per_core: 200, record_cache, record_mem_max_size, diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index 66ce12a1..db246722 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -48,7 +48,8 @@ pub struct MediaConfig { pub ice_lite: bool, pub webrtc_addrs: Vec, pub webrtc_addrs_alt: Vec, - pub rtpengine_rtp_ip: IpAddr, + pub rtpengine_listen_ip: IpAddr, + pub rtpengine_public_ip: IpAddr, pub secure: Arc, pub max_live: HashMap, pub enable_gateway_agent: bool, @@ -219,7 +220,7 @@ impl MediaServerWorker { MediaWorkerWebrtc::new(media.webrtc_addrs, media.webrtc_addrs_alt, media.ice_lite, media.secure.clone()), TaskType::MediaWebrtc, ), - media_rtpengine: TaskSwitcherBranch::new(MediaWorkerRtpEngine::new(media.rtpengine_rtp_ip), TaskType::MediaRtpEngine), + media_rtpengine: TaskSwitcherBranch::new(MediaWorkerRtpEngine::new(media.rtpengine_listen_ip, media.rtpengine_public_ip), TaskType::MediaRtpEngine), media_max_live, switcher: TaskSwitcher::new(4), queue, diff --git a/packages/transport_rtpengine/src/transport.rs b/packages/transport_rtpengine/src/transport.rs index 2eff1fc8..5fa4a166 100644 --- a/packages/transport_rtpengine/src/transport.rs +++ b/packages/transport_rtpengine/src/transport.rs @@ -66,10 +66,10 @@ pub struct TransportRtpEngine { } impl TransportRtpEngine { - pub fn new_offer(room: RoomId, peer: PeerId, ip: IpAddr) -> Result<(Self, String), String> { - let socket = std::net::UdpSocket::bind(SocketAddr::new(ip, 0)).map_err(|e| e.to_string())?; + pub fn new_offer(room: RoomId, peer: PeerId, public_ip: IpAddr, listen_ip: IpAddr) -> Result<(Self, String), String> { + let socket = std::net::UdpSocket::bind(SocketAddr::new(listen_ip, 0)).map_err(|e| e.to_string())?; let port = socket.local_addr().map_err(|e| e.to_string())?.port(); - let answer = sdp_builder(ip, port); + let answer = sdp_builder(public_ip, port); Ok(( Self { @@ -85,7 +85,7 @@ impl TransportRtpEngine { last_send_rtp: None, queue: DynamicDeque::from([ TransportOutput::Net(BackendOutgoing::UdpListen { - addr: SocketAddr::new(ip, port), + addr: SocketAddr::new(listen_ip, port), reuse: false, }), TransportOutput::Event(TransportEvent::State(TransportState::New)), @@ -99,7 +99,7 @@ impl TransportRtpEngine { )) } - pub fn new_answer(room: RoomId, peer: PeerId, ip: IpAddr, offer: &str) -> Result<(Self, String), String> { + pub fn new_answer(room: RoomId, peer: PeerId, public_ip: IpAddr, listen_ip: IpAddr, offer: &str) -> Result<(Self, String), String> { let mut offer = SessionDescription::try_from(offer.to_string()).map_err(|e| e.to_string())?; let dest_ip: IpAddr = if let Some(conn) = offer.connection { conn.connection_address.base @@ -111,9 +111,9 @@ impl TransportRtpEngine { log::info!("[TransportRtpEngine] on create answer => set remote to {remote}"); - let socket = std::net::UdpSocket::bind(SocketAddr::new(ip, 0)).map_err(|e| e.to_string())?; + let socket = std::net::UdpSocket::bind(SocketAddr::new(listen_ip, 0)).map_err(|e| e.to_string())?; let port = socket.local_addr().map_err(|e| e.to_string())?.port(); - let answer = sdp_builder(ip, port); + let answer = sdp_builder(public_ip, port); Ok(( Self { @@ -129,7 +129,7 @@ impl TransportRtpEngine { last_send_rtp: None, queue: DynamicDeque::from([ TransportOutput::Net(BackendOutgoing::UdpListen { - addr: SocketAddr::new(ip, port), + addr: SocketAddr::new(listen_ip, port), reuse: false, }), TransportOutput::Event(TransportEvent::State(TransportState::Connecting(dest_ip))), diff --git a/packages/transport_rtpengine/src/worker.rs b/packages/transport_rtpengine/src/worker.rs index 53ab5d1f..015614cf 100644 --- a/packages/transport_rtpengine/src/worker.rs +++ b/packages/transport_rtpengine/src/worker.rs @@ -40,16 +40,18 @@ pub enum GroupOutput { #[allow(clippy::type_complexity)] pub struct MediaWorkerRtpEngine { - ip: IpAddr, + listen_ip: IpAddr, + public_ip: IpAddr, endpoints: TaskGroup, EndpointOutput, Endpoint, 16>, queue: VecDeque, shutdown: bool, } impl MediaWorkerRtpEngine { - pub fn new(ip: IpAddr) -> Self { + pub fn new(listen_ip: IpAddr, public_ip: IpAddr) -> Self { Self { - ip, + listen_ip, + public_ip, endpoints: TaskGroup::default(), queue: VecDeque::new(), shutdown: false, @@ -58,9 +60,9 @@ impl MediaWorkerRtpEngine { pub fn spawn(&mut self, app: AppContext, room: RoomId, peer: PeerId, record: bool, session_id: u64, offer: Option<&str>) -> RpcResult<(usize, String)> { let (tran, answer) = if let Some(offer) = offer { - TransportRtpEngine::new_answer(room, peer, self.ip, offer).map_err(|e| RpcError::new(1000_u32, &e))? + TransportRtpEngine::new_answer(room, peer, self.public_ip, self.listen_ip, offer).map_err(|e| RpcError::new(1000_u32, &e))? } else { - TransportRtpEngine::new_offer(room, peer, self.ip).map_err(|e| RpcError::new(1000_u32, &e))? + TransportRtpEngine::new_offer(room, peer, self.public_ip, self.listen_ip).map_err(|e| RpcError::new(1000_u32, &e))? }; let cfg = EndpointCfg { app, From e4c7dddf548e461da2cc0e97c646bfb122863b93 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 22:51:56 +0700 Subject: [PATCH 09/14] accept both single addr or array addrs with seeds-from-url --- Cargo.lock | 1 + bin/Cargo.toml | 1 + bin/src/http/api_console/cluster.rs | 35 ++++++++++++++++++++++++++++- bin/src/http/api_node.rs | 15 +++++-------- bin/src/lib.rs | 20 +++++++++-------- bin/src/main.rs | 15 +++++++------ bin/src/server/console.rs | 2 +- bin/src/server/gateway.rs | 2 +- bin/src/server/media.rs | 2 +- bin/z0_connector_n4.sh | 2 +- bin/z0_gate_n1.sh | 2 +- bin/z0_media_n2.sh | 2 +- bin/z0_media_n3.sh | 2 +- bin/z1_gate_n1.sh | 2 +- bin/z1_media_n2.sh | 2 +- bin/z1_media_n3.sh | 2 +- 16 files changed, 70 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c9a753f..30793b15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -496,6 +496,7 @@ dependencies = [ "sans-io-runtime", "sentry", "serde", + "serde_json", "sysinfo", "tokio", "tracing-subscriber", diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 2e5bf73e..f25a5872 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -28,6 +28,7 @@ media-server-multi-tenancy = { path = "../packages/multi_tenancy", optional = tr rtpengine-ngcontrol = { path = "../packages/rtpengine_ngcontrol", optional = true } local-ip-address = "0.6" serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0" } quinn = { version = "0.11", optional = true } rustls = { version = "0.23", optional = true } convert-enum = { workspace = true } diff --git a/bin/src/http/api_console/cluster.rs b/bin/src/http/api_console/cluster.rs index f2477962..4f1cf322 100644 --- a/bin/src/http/api_console/cluster.rs +++ b/bin/src/http/api_console/cluster.rs @@ -3,12 +3,45 @@ use crate::server::console_storage::{ConsoleNode, Zone, ZoneDetails}; use super::{super::Response, ConsoleApisCtx, ConsoleAuthorization}; use media_server_protocol::cluster::ZoneId; use poem::web::Data; -use poem_openapi::{param::Path, payload::Json, OpenApi}; +use poem_openapi::{ + param::{Path, Query}, + payload::Json, + Enum, OpenApi, +}; pub struct Apis; +#[derive(Debug, Clone, Enum)] +enum NodeType { + Console, + Gateway, + Connector, + Media, +} + #[OpenApi] impl Apis { + /// Get seed nodes for a zone and node type + /// With console node type, it will return all consoles. + /// With gateway node type, it will return all consoles and gateways in the same zone. + /// With connector node type, it will return all gateways in the zone. + /// With media node type, it will return all gateways in the zone. + #[oai(path = "/seeds", method = "get")] + async fn seeds_for(&self, zone_id: Query, node_type: Query, Data(ctx): Data<&ConsoleApisCtx>) -> Json> { + log::info!("seeds_for zone_id: {}, node_type: {:?}", zone_id.0, node_type.0); + match node_type.0 { + NodeType::Console => Json(ctx.storage.consoles().iter().map(|node| node.addr.clone()).collect()), + NodeType::Gateway => { + let consoles = ctx.storage.consoles().into_iter().map(|node| node.addr.clone()); + let zone = ctx.storage.zone(ZoneId(zone_id.0)); + let same_zone_gateways = zone.iter().map(|n| n.gateways.iter()).flatten().map(|n| n.addr.clone()); + Json(consoles.chain(same_zone_gateways).collect()) + } + NodeType::Connector => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()), + NodeType::Media => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()), + } + } + /// get consoles from all zones #[oai(path = "/consoles", method = "get")] async fn consoles(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>) -> Json>> { diff --git a/bin/src/http/api_node.rs b/bin/src/http/api_node.rs index 85bd3198..b5126ac1 100644 --- a/bin/src/http/api_node.rs +++ b/bin/src/http/api_node.rs @@ -1,9 +1,8 @@ -use poem_openapi::{payload::Json, OpenApi}; - -use super::Response; +use atm0s_sdn::NodeAddr; +use poem_openapi::{payload::PlainText, OpenApi}; pub struct NodeApiCtx { - pub address: String, + pub address: NodeAddr, } pub struct Apis { @@ -19,11 +18,7 @@ impl Apis { #[OpenApi] impl Apis { #[oai(path = "/address", method = "get")] - async fn get_address(&self) -> Json> { - Json(Response { - status: true, - data: Some(self.ctx.address.clone()), - ..Default::default() - }) + async fn get_address(&self) -> PlainText { + PlainText(self.ctx.address.to_string()) } } diff --git a/bin/src/lib.rs b/bin/src/lib.rs index 63b17f78..d951022e 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -27,15 +27,17 @@ pub struct NodeConfig { pub bind_addrs_alt: Vec, } -pub async fn fetch_node_addr_from_api(url: &str) -> Result { - let resp = reqwest::get(format!("{}/api/node/address", url)).await.map_err(|e| e.to_string())?; - let node_addr = resp - .json::>() - .await - .map_err(|e| e.to_string())? - .data - .ok_or(format!("No data in response from {}", url))?; - NodeAddr::from_str(&node_addr).map_err(|e| e.to_string()) +/// Fetch node addrs from the given url. +/// The url should return a list of node addrs in JSON format or a single node addr. +pub async fn fetch_node_addrs_from_api(url: &str) -> Result, String> { + let resp = reqwest::get(url).await.map_err(|e| e.to_string())?; + let content = resp.text().await.map_err(|e| e.to_string())?; + if content.starts_with("[") { + let node_addrs: Vec = serde_json::from_str(&content).map_err(|e| e.to_string())?; + Ok(node_addrs.into_iter().map(|addr| NodeAddr::from_str(&addr)).flatten().collect()) + } else { + Ok(vec![NodeAddr::from_str(&content).map_err(|e| e.to_string())?]) + } } #[derive(Debug, Clone, ValueEnum)] diff --git a/bin/src/main.rs b/bin/src/main.rs index f02eded7..5b04b816 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -1,6 +1,6 @@ use std::net::{IpAddr, SocketAddr}; -use atm0s_media_server::{fetch_node_addr_from_api, server, NodeConfig}; +use atm0s_media_server::{fetch_node_addrs_from_api, server, NodeConfig}; use atm0s_media_server::{fetch_node_ip_alt_from_cloud, CloudProvider}; use atm0s_sdn::NodeAddr; use clap::Parser; @@ -68,8 +68,9 @@ struct Args { /// Seeds from API, this is used for auto-discovery of seeds. /// Currently all of nodes expose /api/node/address endpoint, so we can get seeds from there. + /// Or we can get from console api /api/cluster/seeds?zone_id=xxx&node_type=xxx #[arg(env, long)] - seeds_from_node_api: Option, + seeds_from_url: Option, /// Number of worker threads to spawn. #[arg(env, long, default_value_t = 1)] @@ -170,11 +171,11 @@ async fn main() { log::info!("Bind addrs {:?}, bind addrs alt {:?}", node.bind_addrs, node.bind_addrs_alt); - if let Some(seeds_from_node_api) = args.seeds_from_node_api { - log::info!("Generate seeds from node_api {}", seeds_from_node_api); - let addr = fetch_node_addr_from_api(&seeds_from_node_api).await.expect("should get seed"); - log::info!("Generated seed {:?}", addr); - node.seeds = vec![addr]; + if let Some(url) = args.seeds_from_url { + log::info!("Generate seeds from node_api {}", url); + let addrs = fetch_node_addrs_from_api(&url).await.expect("should get seeds"); + log::info!("Generated seeds {:?}", addrs); + node.seeds = addrs; } let local = tokio::task::LocalSet::new(); diff --git a/bin/src/server/console.rs b/bin/src/server/console.rs index 62919b64..cf2b415b 100644 --- a/bin/src/server/console.rs +++ b/bin/src/server/console.rs @@ -70,7 +70,7 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No if let Some(http_port) = http_port { let secure = MediaConsoleSecureJwt::from(node.secret.as_bytes()); let storage = storage.clone(); - let node_ctx = NodeApiCtx { address: node_addr.to_string() }; + let node_ctx = NodeApiCtx { address: node_addr.clone() }; tokio::spawn(async move { if let Err(e) = run_console_http_server(http_port, node_ctx, secure, storage, connector_rpc_client).await { log::error!("HTTP Error: {}", e); diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index 1dcc91d9..944816c7 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -162,7 +162,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod if let Some(http_port) = http_port { let req_tx = req_tx.clone(); let secure2 = edge_secure.clone(); - let node_ctx = NodeApiCtx { address: node_addr.to_string() }; + let node_ctx = NodeApiCtx { address: node_addr.clone() }; tokio::spawn(async move { if let Err(e) = run_gateway_http_server(http_port, node_ctx, req_tx, secure2, gateway_secure).await { log::error!("HTTP Error: {}", e); diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 392ea39e..5ba8fbd9 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -106,7 +106,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node }); let req_tx = req_tx.clone(); let secure_edge = secure.clone(); - let node_ctx = NodeApiCtx { address: node_addr.to_string() }; + let node_ctx = NodeApiCtx { address: node_addr.clone() }; tokio::spawn(async move { if let Err(e) = run_media_http_server(http_port, node_ctx, req_tx, secure_edge, secure_gateway).await { log::error!("HTTP Error: {}", e); diff --git a/bin/z0_connector_n4.sh b/bin/z0_connector_n4.sh index ab670b1b..685c1fb5 100644 --- a/bin/z0_connector_n4.sh +++ b/bin/z0_connector_n4.sh @@ -3,6 +3,6 @@ RUST_BACKTRACE=1 \ cargo run -- \ --sdn-zone-id 0 \ --sdn-zone-node-id 4 \ - --seeds-from-node-api "http://localhost:3000" \ + --seeds-from-url "http://localhost:3000/api/node/address" \ connector \ --s3-uri "http://minioadmin:minioadmin@127.0.0.1:9000/record" diff --git a/bin/z0_gate_n1.sh b/bin/z0_gate_n1.sh index 2e82a2b7..565dc731 100644 --- a/bin/z0_gate_n1.sh +++ b/bin/z0_gate_n1.sh @@ -6,7 +6,7 @@ cargo run -- \ --sdn-port 10001 \ --sdn-zone-id 0 \ --sdn-zone-node-id 1 \ - --seeds-from-node-api "http://localhost:8080" \ + --seeds-from-url "http://localhost:8080/api/cluster/seeds?zone_id=0&node_type=Gateway" \ --workers 2 \ gateway \ --lat 10 \ diff --git a/bin/z0_media_n2.sh b/bin/z0_media_n2.sh index 86235d1b..40866cc0 100644 --- a/bin/z0_media_n2.sh +++ b/bin/z0_media_n2.sh @@ -4,6 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 2 \ - --seeds-from-node-api "http://localhost:3000" \ + --seeds-from-url "http://localhost:3000/api/node/address" \ --workers 2 \ media diff --git a/bin/z0_media_n3.sh b/bin/z0_media_n3.sh index 426a45ca..9b4105ad 100644 --- a/bin/z0_media_n3.sh +++ b/bin/z0_media_n3.sh @@ -4,6 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 3 \ - --seeds-from-node-api "http://localhost:3000" \ + --seeds-from-url "http://localhost:3000/api/node/address" \ --workers 2 \ media diff --git a/bin/z1_gate_n1.sh b/bin/z1_gate_n1.sh index c4056239..fb9b8946 100644 --- a/bin/z1_gate_n1.sh +++ b/bin/z1_gate_n1.sh @@ -6,7 +6,7 @@ cargo run -- \ --sdn-zone-id 1 \ --sdn-zone-node-id 1 \ --sdn-port 11000 \ - --seeds 0@/ip4/127.0.0.1/udp/10000 \ + --seeds-from-url "http://localhost:8080/api/cluster/seeds?zone_id=1&node_type=Gateway" \ --workers 2 \ gateway \ --lat 20 \ diff --git a/bin/z1_media_n2.sh b/bin/z1_media_n2.sh index b2f736d4..28ba6016 100644 --- a/bin/z1_media_n2.sh +++ b/bin/z1_media_n2.sh @@ -4,7 +4,7 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 1 \ --sdn-zone-node-id 2 \ - --seeds 257@/ip4/127.0.0.1/udp/11000 \ + --seeds-from-url "http://localhost:4000/api/node/address" \ --workers 2 \ media \ --webrtc-port-seed 11200 \ diff --git a/bin/z1_media_n3.sh b/bin/z1_media_n3.sh index 75957c30..e34ea4e0 100644 --- a/bin/z1_media_n3.sh +++ b/bin/z1_media_n3.sh @@ -4,7 +4,7 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 1 \ --sdn-zone-node-id 3 \ - --seeds 257@/ip4/127.0.0.1/udp/11000 \ + --seeds-from-url "http://localhost:4000/api/node/address" \ --workers 2 \ media \ --webrtc-port-seed 11300 \ From 8d539b3bd8b559f28238e1d2b32d3b779885a098 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 23:41:52 +0700 Subject: [PATCH 10/14] fix: connector only need to connect to same-zone gateway --- bin/src/server/connector.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/src/server/connector.rs b/bin/src/server/connector.rs index d2f0f1ec..460bf724 100644 --- a/bin/src/server/connector.rs +++ b/bin/src/server/connector.rs @@ -10,6 +10,7 @@ use media_server_multi_tenancy::{MultiTenancyStorage, MultiTenancySync}; use media_server_protocol::{ cluster::{ClusterNodeGenericInfo, ClusterNodeInfo}, connector::CONNECTOR_RPC_PORT, + gateway::generate_gateway_zone_tag, protobuf::cluster_connector::{connector_response, MediaConnectorServiceServer}, rpc::quinn::QuinnServer, }; @@ -124,7 +125,7 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { }); builder.set_authorization(StaticKeyAuthorization::new(&node.secret)); - builder.set_manual_discovery(vec!["connector".to_string()], vec!["gateway".to_string()]); + builder.set_manual_discovery(vec!["connector".to_string()], vec![generate_gateway_zone_tag(node.zone)]); builder.add_service(Arc::new(ConnectorHandlerServiceBuilder::new())); for seed in node.seeds { From 43634293e0743b6992b74b1dcd9a0f3889496685 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Tue, 19 Nov 2024 02:16:24 +0700 Subject: [PATCH 11/14] fix record with aws-s3 --- packages/media_connector/Cargo.toml | 2 +- packages/media_connector/src/sql_storage.rs | 20 +++++++++++++++----- packages/media_utils/Cargo.toml | 1 + packages/media_utils/src/uri.rs | 9 +++++++-- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/packages/media_connector/Cargo.toml b/packages/media_connector/Cargo.toml index 0faced60..c743a992 100644 --- a/packages/media_connector/Cargo.toml +++ b/packages/media_connector/Cargo.toml @@ -24,7 +24,7 @@ sea-orm = { version = "1.1.0-rc.1", features = [ ] } sea-query = "0.32.0-rc.1" serde_json = "1.0" -s3-presign = "0.0.2" +s3-presign = "0.0.3" uuid = {version = "1.10", features = ["fast-rng", "v7"]} reqwest = { version = "0.12", features = ["json"]} diff --git a/packages/media_connector/src/sql_storage.rs b/packages/media_connector/src/sql_storage.rs index 4e0a082f..6e2cee3a 100644 --- a/packages/media_connector/src/sql_storage.rs +++ b/packages/media_connector/src/sql_storage.rs @@ -52,23 +52,33 @@ impl ConnectorSqlStorage { .acquire_timeout(Duration::from_secs(8)) .idle_timeout(Duration::from_secs(8)) .max_lifetime(Duration::from_secs(8)) - .sqlx_logging(false) - .sqlx_logging_level(log::LevelFilter::Info); // Setting default PostgreSQL schema + .sqlx_logging(true) + .sqlx_logging_level(log::LevelFilter::Info); let db = Database::connect(opt).await.expect("Should connect to sql server"); migration::Migrator::up(&db, None).await.expect("Should run migration success"); let s3_endpoint = CustomUri::::try_from(cfg.s3_uri.as_str()).expect("should parse s3"); - let mut s3 = Presigner::new( - Credentials::new(s3_endpoint.username.expect("Should have s3 accesskey"), s3_endpoint.password.expect("Should have s3 secretkey"), None), + let mut s3 = Presigner::new_with_root( + Credentials::new( + s3_endpoint.username.as_deref().expect("Should have s3 accesskey"), + s3_endpoint.password.as_deref().expect("Should have s3 secretkey"), + None, + ), s3_endpoint.path.first().as_ref().expect("Should have bucket name"), s3_endpoint.query.region.as_ref().unwrap_or(&"".to_string()), + s3_endpoint.host.as_str(), ); - s3.endpoint(s3_endpoint.endpoint.as_str()); if s3_endpoint.query.path_style == Some(true) { + log::info!("[ConnectorSqlStorage] use path style"); s3.use_path_style(); } + let signed_url = s3.put("aaa.mp4", 10000).unwrap(); + log::info!("[ConnectorSqlStorage] signed_url: {:?}", signed_url); + let res = reqwest::Client::new().put(signed_url).body(vec![1; 10000]).send().await.unwrap(); + assert_eq!(res.status().as_u16(), 200); + let s3_sub_folder = s3_endpoint.path[1..].join("/"); Self { diff --git a/packages/media_utils/Cargo.toml b/packages/media_utils/Cargo.toml index 74ba7783..7afbf415 100644 --- a/packages/media_utils/Cargo.toml +++ b/packages/media_utils/Cargo.toml @@ -16,3 +16,4 @@ pin-project-lite = "0.2" spin = { workspace = true } once_cell = "1.20" derive_more = "1.0" +urlencoding = "2.1" \ No newline at end of file diff --git a/packages/media_utils/src/uri.rs b/packages/media_utils/src/uri.rs index 27ec37d7..15278351 100644 --- a/packages/media_utils/src/uri.rs +++ b/packages/media_utils/src/uri.rs @@ -5,6 +5,7 @@ pub struct CustomUri { pub username: Option, pub password: Option, pub endpoint: String, + pub host: String, pub path: Vec, pub query: Q, } @@ -29,10 +30,14 @@ impl TryFrom<&str> for CustomUri { (false, Some(port)) => format!("http://{}:{}", host, port), }; + let username = uri.username().map(|u| urlencoding::decode(&u.to_string()).map(|u| u.to_string()).ok()).flatten(); + let password = uri.password().map(|u| urlencoding::decode(&u.to_string()).map(|u| u.to_string()).ok()).flatten(); + Ok(Self { - username: uri.username().map(|u| u.to_string()), - password: uri.password().map(|u| u.to_string()), + username: username.map(|u| u.to_string()), + password: password.map(|u| u.to_string()), endpoint, + host: host.to_string(), path, query, }) From 50802797d48ba92f970985c98776b91cce555465 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Tue, 19 Nov 2024 09:23:22 +0700 Subject: [PATCH 12/14] fix record compose --- Cargo.lock | 11 ++- packages/media_connector/src/sql_storage.rs | 6 -- .../media_record/bin/convert_record_worker.rs | 71 +++++++++++++++---- packages/media_record/bin/run_worker.sh | 4 -- packages/media_record/src/convert/composer.rs | 2 + packages/media_utils/src/uri.rs | 1 + 6 files changed, 70 insertions(+), 25 deletions(-) delete mode 100644 packages/media_record/bin/run_worker.sh diff --git a/Cargo.lock b/Cargo.lock index 30793b15..0eb1e7e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3373,6 +3373,7 @@ dependencies = [ "sorted-vec", "spin", "uriparse", + "urlencoding", ] [[package]] @@ -5170,9 +5171,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "s3-presign" -version = "0.0.2" +version = "0.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "748e207ba5bb8317987e9a6db2e33c8e43de4eec7ad6c2e5faec49f6fdd46e92" +checksum = "87dd93104cb3e5cff33676215f1ea9296bd524d2b783cbaf24422955d6b50dd1" dependencies = [ "chrono", "hmac 0.12.1", @@ -6989,6 +6990,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.2" diff --git a/packages/media_connector/src/sql_storage.rs b/packages/media_connector/src/sql_storage.rs index 6e2cee3a..4c0088c8 100644 --- a/packages/media_connector/src/sql_storage.rs +++ b/packages/media_connector/src/sql_storage.rs @@ -73,12 +73,6 @@ impl ConnectorSqlStorage { log::info!("[ConnectorSqlStorage] use path style"); s3.use_path_style(); } - - let signed_url = s3.put("aaa.mp4", 10000).unwrap(); - log::info!("[ConnectorSqlStorage] signed_url: {:?}", signed_url); - let res = reqwest::Client::new().put(signed_url).body(vec![1; 10000]).send().await.unwrap(); - assert_eq!(res.status().as_u16(), 200); - let s3_sub_folder = s3_endpoint.path[1..].join("/"); Self { diff --git a/packages/media_record/bin/convert_record_worker.rs b/packages/media_record/bin/convert_record_worker.rs index 4a036db3..b3cde50c 100644 --- a/packages/media_record/bin/convert_record_worker.rs +++ b/packages/media_record/bin/convert_record_worker.rs @@ -27,7 +27,7 @@ use poem_openapi::{ use rusty_s3::S3Action; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; -macro_rules! try_opt { +macro_rules! try_validate_app { ($self:ident, $token:ident, $err:expr) => { match $self.apps.validate_app(&$token.token) { Some(app) => app, @@ -42,6 +42,21 @@ macro_rules! try_opt { }; } +macro_rules! try_opt { + ($opt:expr, $err:expr) => { + match $opt { + Some(o) => o, + None => { + return Json(Response { + status: false, + error: Some($err.to_owned()), + data: None, + }); + } + } + }; +} + #[derive(SecurityScheme)] #[oai(rename = "Token Authorization", ty = "bearer", key_in = "header", key_name = "Authorization")] pub struct TokenAuthorization(pub Bearer); @@ -187,14 +202,29 @@ struct HttpApis { impl HttpApis { #[oai(path = "/convert/job", method = "post")] async fn create_job(&self, TokenAuthorization(token): TokenAuthorization, Json(body): Json) -> Json> { - let app = try_opt!(self, token, "Invalid token"); + let app = try_validate_app!(self, token, "Invalid token"); let job_id = rand::random::().to_string(); - let input_s3 = format!("{}/{}/?path_style=true", self.input_s3_uri, body.record_path); - let transmux_s3_uri = self.transmux_s3_uri.clone(); + let input_s3 = try_opt!(concat_s3_uri_path(&self.input_s3_uri, &body.record_path), "Invalid input_s3_uri"); let compose_s3_uri = self.compose_s3_uri.clone(); let job_id_c = job_id.clone(); let hook = self.hook.clone(); + // get yyyy/mm/dd with chrono + let current_date_path = chrono::Utc::now().format("%Y/%m/%d").to_string(); + let transmux = if let Some(t) = body.transmux { + if let Some(custom_s3) = t.custom_s3 { + Some(RecordConvertOutputLocation::S3(custom_s3)) + } else { + let s3 = try_opt!( + concat_s3_uri_path(&self.transmux_s3_uri, &format!("{}/transmux/{current_date_path}/{job_id_c}", app.app)), + "Invalid transmux_s3_uri" + ); + Some(RecordConvertOutputLocation::S3(s3)) + } + } else { + None + }; + tokio::spawn(async move { log::info!("Convert job {job_id_c} started"); hook.on_event( @@ -210,16 +240,9 @@ impl HttpApis { }, ); - // get yyyy/mm/dd with chrono - let current_date_path = chrono::Utc::now().format("%Y/%m/%d").to_string(); let converter = RecordConvert::new(RecordConvertConfig { in_s3: input_s3, - transmux: body.transmux.map(|t| { - let uri = t - .custom_s3 - .unwrap_or_else(|| format!("{transmux_s3_uri}/{}/transmux/{current_date_path}/{job_id_c}?path_style=true", app.app)); - RecordConvertOutputLocation::S3(uri) - }), + transmux, compose: body.compose.map(|c| { let (uri, relative) = c .custom_s3 @@ -228,7 +251,6 @@ impl HttpApis { (u, relative) }) .unwrap_or_else(|| { - let compose_s3_uri = format!("{compose_s3_uri}?path_style=true"); let (s3, credentials, s3_sub_folder) = convert_s3_uri(&compose_s3_uri).expect("should convert compose_s3_uri"); let relative = format!("{}/compose/{current_date_path}/{job_id_c}.webm", app.app); let path = PathBuf::from(s3_sub_folder).join(&relative); @@ -281,3 +303,26 @@ impl HttpApis { }) } } + +fn concat_s3_uri_path(s3_uri: &str, path: &str) -> Option { + fn ensure_last_slash(s: String) -> String { + if s.ends_with('/') { + s + } else { + s + "/" + } + } + + let parts = s3_uri.split('?').collect::>(); + if parts.len() == 2 { + let first = PathBuf::from(parts[0]).join(path).to_str()?.to_string(); + let first = ensure_last_slash(first); + log::info!("first: {}", first); + Some(first + "?" + parts[1]) + } else { + let first = PathBuf::from(s3_uri).join(path).to_str()?.to_string(); + let first = ensure_last_slash(first); + log::info!("first: {}", first); + Some(first) + } +} diff --git a/packages/media_record/bin/run_worker.sh b/packages/media_record/bin/run_worker.sh deleted file mode 100644 index f714bb03..00000000 --- a/packages/media_record/bin/run_worker.sh +++ /dev/null @@ -1,4 +0,0 @@ -cargo run --bin convert_record_worker -- \ - --input-s3-uri http://ows-storage:tzraxj7wptoh@storage.dev.owslab.io/atm0s-record \ - --multi-tenancy-sync https://atm0s.wiremockapi.cloud/cp/apps \ - --secret zsz94nsrj3xvmbu555nmu25hwqo6shiq \ No newline at end of file diff --git a/packages/media_record/src/convert/composer.rs b/packages/media_record/src/convert/composer.rs index 864bbd22..76e4b761 100644 --- a/packages/media_record/src/convert/composer.rs +++ b/packages/media_record/src/convert/composer.rs @@ -122,9 +122,11 @@ impl RecordComposer { let room_reader = RoomReader::new(s3, credentials, &s3_sub_folder); let peers = room_reader.peers().await.map_err(|e| e.to_string())?; + log::info!("check room peers {:?}", peers.iter().map(|p| p.peer()).collect::>()); //we use channel to wait all sessions for peer in peers { let sessions = peer.sessions().await.map_err(|e| e.to_string())?; + log::info!("check peer {} sessions {:?}", peer.peer(), sessions.iter().map(|s| s.id()).collect::>()); for mut session in sessions { session.connect().await.map_err(|e| e.to_string())?; let id = session.id(); diff --git a/packages/media_utils/src/uri.rs b/packages/media_utils/src/uri.rs index 15278351..690287ca 100644 --- a/packages/media_utils/src/uri.rs +++ b/packages/media_utils/src/uri.rs @@ -1,6 +1,7 @@ use serde::de::DeserializeOwned; use serde_querystring::DuplicateQS; +#[derive(Debug, Clone)] pub struct CustomUri { pub username: Option, pub password: Option, From eecf5ca2edc399b585e28ea074b6883c04aa5722 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Tue, 19 Nov 2024 09:31:12 +0700 Subject: [PATCH 13/14] fix warns --- bin/src/http/api_console/cluster.rs | 2 +- bin/src/lib.rs | 4 ++-- bin/src/server/media.rs | 2 +- packages/media_utils/src/uri.rs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bin/src/http/api_console/cluster.rs b/bin/src/http/api_console/cluster.rs index 4f1cf322..6f8282f3 100644 --- a/bin/src/http/api_console/cluster.rs +++ b/bin/src/http/api_console/cluster.rs @@ -34,7 +34,7 @@ impl Apis { NodeType::Gateway => { let consoles = ctx.storage.consoles().into_iter().map(|node| node.addr.clone()); let zone = ctx.storage.zone(ZoneId(zone_id.0)); - let same_zone_gateways = zone.iter().map(|n| n.gateways.iter()).flatten().map(|n| n.addr.clone()); + let same_zone_gateways = zone.iter().flat_map(|n| n.gateways.iter()).map(|n| n.addr.clone()); Json(consoles.chain(same_zone_gateways).collect()) } NodeType::Connector => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()), diff --git a/bin/src/lib.rs b/bin/src/lib.rs index d951022e..bb18ee35 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -32,9 +32,9 @@ pub struct NodeConfig { pub async fn fetch_node_addrs_from_api(url: &str) -> Result, String> { let resp = reqwest::get(url).await.map_err(|e| e.to_string())?; let content = resp.text().await.map_err(|e| e.to_string())?; - if content.starts_with("[") { + if content.starts_with('[') { let node_addrs: Vec = serde_json::from_str(&content).map_err(|e| e.to_string())?; - Ok(node_addrs.into_iter().map(|addr| NodeAddr::from_str(&addr)).flatten().collect()) + Ok(node_addrs.into_iter().flat_map(|addr| NodeAddr::from_str(&addr)).collect()) } else { Ok(vec![NodeAddr::from_str(&content).map_err(|e| e.to_string())?]) } diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 5ba8fbd9..6fbc04d5 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -161,7 +161,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node webrtc_addrs, webrtc_addrs_alt, rtpengine_listen_ip: args.rtpengine_listen_ip, - rtpengine_public_ip: rtpengine_public_ip, + rtpengine_public_ip, ice_lite: args.ice_lite, secure: secure.clone(), max_live: HashMap::from([(ServiceKind::Webrtc, workers as u32 * args.ccu_per_core), (ServiceKind::RtpEngine, workers as u32 * args.ccu_per_core)]), diff --git a/packages/media_utils/src/uri.rs b/packages/media_utils/src/uri.rs index 690287ca..3fc98fdf 100644 --- a/packages/media_utils/src/uri.rs +++ b/packages/media_utils/src/uri.rs @@ -31,8 +31,8 @@ impl TryFrom<&str> for CustomUri { (false, Some(port)) => format!("http://{}:{}", host, port), }; - let username = uri.username().map(|u| urlencoding::decode(&u.to_string()).map(|u| u.to_string()).ok()).flatten(); - let password = uri.password().map(|u| urlencoding::decode(&u.to_string()).map(|u| u.to_string()).ok()).flatten(); + let username = uri.username().and_then(|u| urlencoding::decode(u.as_ref()).map(|u| u.to_string()).ok()); + let password = uri.password().and_then(|u| urlencoding::decode(u.as_ref()).map(|u| u.to_string()).ok()); Ok(Self { username: username.map(|u| u.to_string()), From 2360ca20059654be152f2871673127cf5e228311 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Wed, 20 Nov 2024 10:06:58 +0700 Subject: [PATCH 14/14] added node router_dump --- Cargo.lock | 15 +++++---------- Cargo.toml | 4 ++-- bin/src/http.rs | 29 +++++++++++++++++++++++++++++ bin/src/http/api_node.rs | 29 ++++++++++++++++++++++++++++- bin/src/main.rs | 2 +- bin/src/server/connector.rs | 35 +++++++++++++++++++++++++++++++++-- bin/src/server/console.rs | 25 +++++++++++++++++++++++-- bin/src/server/gateway.rs | 27 +++++++++++++++++++++++++-- bin/src/server/media.rs | 28 ++++++++++++++++++++++++++-- bin/src/server/standalone.rs | 1 + 10 files changed, 173 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0eb1e7e7..c564ac59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -505,8 +505,7 @@ dependencies = [ [[package]] name = "atm0s-sdn" version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ae4bafdf42069f4419ca9b361a3a31d59d8bbdbf87b14a0eee7fdcd744d30db" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "atm0s-sdn-identity", "atm0s-sdn-network", @@ -525,8 +524,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-identity" version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ec96b78570f6f704f7b7074f2c496604c2729044d5374441fee7cf202ee7633" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "multiaddr", "rand 0.8.5", @@ -536,8 +534,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-network" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68bb1728d2a0f5f517c6be60a6e32bd73767b8bf70bb9df5f006d188def58210" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "aes-gcm 0.10.3", "atm0s-sdn-identity", @@ -564,8 +561,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-router" version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6679fb3c1797ae958c247a0549c3e33224c7c0352e5b28bb984f8a7e90bd7bf7" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "atm0s-sdn-identity", "atm0s-sdn-utils", @@ -577,8 +573,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-utils" version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "437f8450fd7095986c5bce4abd7ad03e95f1651a214b8727976363c86d6b9cc9" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?branch=feat-router-dump#aa0a9f9ca0c74b559975ea3f3848ad66e8dbc32b" dependencies = [ "log", "serde", diff --git a/Cargo.toml b/Cargo.toml index 1486341a..5d380e8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ members = [ [workspace.dependencies] sans-io-runtime = { version = "0.3", default-features = false } -atm0s-sdn = { version = "0.2", default-features = false } -atm0s-sdn-network = { version = "0.6", default-features = false } +atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false } +atm0s-sdn-network = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false } tokio = "1.37" tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] } convert-enum = "0.1" diff --git a/bin/src/http.rs b/bin/src/http.rs index f8f7d488..692267a0 100644 --- a/bin/src/http.rs +++ b/bin/src/http.rs @@ -230,6 +230,35 @@ pub async fn run_gateway_http_server Result<(), Box> { + use poem::middleware::Tracing; + + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let metrics_ui = metrics_service.swagger_ui(); + let metrics_spec = metrics_service.spec(); + + let route = Route::new() + //node + .nest("/api/node/", node_service) + .nest("/api/node/ui", node_ui) + .at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone())) + //metrics + .nest("/api/metrics/", metrics_service) + .nest("/api/metrics/ui", metrics_ui) + .at("/api/metrics/spec", poem::endpoint::make_sync(move |_| metrics_spec.clone())) + .with(Cors::new()) + .with(Tracing); + + Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?; + Ok(()) +} + #[cfg(feature = "media")] pub async fn run_media_http_server( port: u16, diff --git a/bin/src/http/api_node.rs b/bin/src/http/api_node.rs index b5126ac1..370de099 100644 --- a/bin/src/http/api_node.rs +++ b/bin/src/http/api_node.rs @@ -1,8 +1,15 @@ +use std::time::Duration; + use atm0s_sdn::NodeAddr; -use poem_openapi::{payload::PlainText, OpenApi}; +use poem_openapi::{ + payload::{Json, PlainText}, + OpenApi, +}; +use tokio::sync::{mpsc::Sender, oneshot}; pub struct NodeApiCtx { pub address: NodeAddr, + pub dump_tx: Sender>, } pub struct Apis { @@ -21,4 +28,24 @@ impl Apis { async fn get_address(&self) -> PlainText { PlainText(self.ctx.address.to_string()) } + + #[oai(path = "/router_dump", method = "get")] + async fn get_router_dump(&self) -> Json { + let (tx, rx) = oneshot::channel(); + self.ctx.dump_tx.send(tx).await.expect("should send"); + match tokio::time::timeout(Duration::from_millis(1000), rx).await { + Ok(Ok(v)) => Json(serde_json::json!({ + "status": true, + "data": v + })), + Ok(Err(e)) => Json(serde_json::json!({ + "status": false, + "error": e.to_string() + })), + Err(_e) => Json(serde_json::json!({ + "status": false, + "error": "timeout" + })), + } + } } diff --git a/bin/src/main.rs b/bin/src/main.rs index 5b04b816..eed0ab6f 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -187,7 +187,7 @@ async fn main() { #[cfg(feature = "gateway")] server::ServerType::Gateway(args) => server::run_media_gateway(workers, http_port, node, args).await, #[cfg(feature = "connector")] - server::ServerType::Connector(args) => server::run_media_connector(workers, node, args).await, + server::ServerType::Connector(args) => server::run_media_connector(workers, http_port, node, args).await, #[cfg(feature = "media")] server::ServerType::Media(args) => server::run_media_server(workers, http_port, node, args).await, #[cfg(feature = "cert_utils")] diff --git a/bin/src/server/connector.rs b/bin/src/server/connector.rs index 460bf724..aea72857 100644 --- a/bin/src/server/connector.rs +++ b/bin/src/server/connector.rs @@ -1,6 +1,11 @@ use std::{sync::Arc, time::Duration}; -use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner}; +use atm0s_sdn::{ + features::{router_sync, FeaturesEvent}, + secure::StaticKeyAuthorization, + services::visualization, + SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner, +}; use clap::Parser; use media_server_connector::{ handler_service::{self, ConnectorHandlerServiceBuilder}, @@ -19,6 +24,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use tokio::sync::mpsc::channel; use crate::{ + http::{run_connector_http_server, NodeApiCtx}, node_metrics::NodeMetricsCollector, quinn::{make_quinn_server, VirtualNetwork}, NodeConfig, @@ -82,7 +88,7 @@ pub struct Args { pub multi_tenancy_sync_interval_ms: u64, } -pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { +pub async fn run_media_connector(workers: usize, http_port: Option, node: NodeConfig, args: Args) { let app_storage = if let Some(url) = args.multi_tenancy_sync { let app_storage = Arc::new(MultiTenancyStorage::new()); let mut app_sync = MultiTenancySync::new(app_storage.clone(), url, Duration::from_millis(args.multi_tenancy_sync_interval_ms)); @@ -201,6 +207,18 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { } }); + // Start http server + let (dump_tx, mut dump_rx) = channel(10); + if let Some(http_port) = http_port { + let node_ctx = NodeApiCtx { address: node_addr.clone(), dump_tx }; + tokio::spawn(async move { + if let Err(e) = run_connector_http_server(http_port, node_ctx).await { + log::error!("HTTP Error: {}", e); + } + }); + } + let mut wait_dump_router = vec![]; + loop { if controller.process().is_none() { break; @@ -224,6 +242,11 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { controller.service_control(HANDLER_SERVICE_ID.into(), (), control.into()); } + while let Ok(v) = dump_rx.try_recv() { + controller.feature_control((), router_sync::Control::DumpRouter.into()); + wait_dump_router.push(v); + } + while let Some(out) = controller.pop_event() { match out { SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event { @@ -238,6 +261,14 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { log::error!("[MediaConnector] forward Sdn SocketEvent error {:?}", e); } } + SdnExtOut::FeaturesEvent(_, FeaturesEvent::RouterSync(event)) => match event { + router_sync::Event::DumpRouter(dump) => { + let json = serde_json::to_value(dump).expect("should convert json"); + while let Some(v) = wait_dump_router.pop() { + let _ = v.send(json.clone()); + } + } + }, _ => {} } } diff --git a/bin/src/server/console.rs b/bin/src/server/console.rs index cf2b415b..9eae74bc 100644 --- a/bin/src/server/console.rs +++ b/bin/src/server/console.rs @@ -1,6 +1,11 @@ use std::time::{Duration, Instant}; -use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner}; +use atm0s_sdn::{ + features::{router_sync, FeaturesEvent}, + secure::StaticKeyAuthorization, + services::visualization, + SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner, +}; use clap::Parser; use media_server_protocol::{ cluster::{ClusterNodeGenericInfo, ClusterNodeInfo}, @@ -9,6 +14,7 @@ use media_server_protocol::{ }; use media_server_secure::jwt::MediaConsoleSecureJwt; use storage::StorageShared; +use tokio::sync::mpsc::channel; use crate::{ http::{run_console_http_server, NodeApiCtx}, @@ -67,10 +73,11 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No tokio::task::spawn_local(async move { while vnet.recv().await.is_some() {} }); + let (dump_tx, mut dump_rx) = channel(10); if let Some(http_port) = http_port { let secure = MediaConsoleSecureJwt::from(node.secret.as_bytes()); let storage = storage.clone(); - let node_ctx = NodeApiCtx { address: node_addr.clone() }; + let node_ctx = NodeApiCtx { address: node_addr.clone(), dump_tx }; tokio::spawn(async move { if let Err(e) = run_console_http_server(http_port, node_ctx, secure, storage, connector_rpc_client).await { log::error!("HTTP Error: {}", e); @@ -79,6 +86,7 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No } let mut node_metrics_collector = NodeMetricsCollector::default(); + let mut wait_dump_router = vec![]; loop { if controller.process().is_none() { @@ -100,6 +108,11 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No storage.on_tick(started_at.elapsed().as_millis() as u64); } + while let Ok(v) = dump_rx.try_recv() { + controller.feature_control((), router_sync::Control::DumpRouter.into()); + wait_dump_router.push(v); + } + while let Some(out) = controller.pop_event() { match out { SdnExtOut::ServicesEvent(_service, (), SE::Visual(event)) => match event { @@ -119,6 +132,14 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No log::error!("forward sdn SocketEvent error {:?}", e); } } + SdnExtOut::FeaturesEvent(_, FeaturesEvent::RouterSync(event)) => match event { + router_sync::Event::DumpRouter(dump) => { + let json = serde_json::to_value(dump).expect("should convert json"); + while let Some(v) = wait_dump_router.pop() { + let _ = v.send(json.clone()); + } + } + }, _ => {} } } diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index 944816c7..55738ae5 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -1,6 +1,11 @@ use std::{sync::Arc, time::Duration}; -use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner}; +use atm0s_sdn::{ + features::{router_sync, FeaturesEvent}, + secure::StaticKeyAuthorization, + services::visualization, + SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner, +}; use clap::Parser; use media_server_connector::agent_service::ConnectorAgentServiceBuilder; use media_server_gateway::{store_service::GatewayStoreServiceBuilder, STORE_SERVICE_ID}; @@ -15,6 +20,7 @@ use media_server_secure::jwt::{MediaEdgeSecureJwt, MediaGatewaySecureJwt}; use rtpengine_ngcontrol::NgUdpTransport; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use std::net::SocketAddr; +use tokio::sync::mpsc::channel; use crate::{ http::{run_gateway_http_server, NodeApiCtx}, @@ -159,10 +165,11 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let (selector, mut requester) = build_dest_selector(); // Setup HTTP server + let (dump_tx, mut dump_rx) = channel(10); if let Some(http_port) = http_port { let req_tx = req_tx.clone(); let secure2 = edge_secure.clone(); - let node_ctx = NodeApiCtx { address: node_addr.clone() }; + let node_ctx = NodeApiCtx { address: node_addr.clone(), dump_tx }; tokio::spawn(async move { if let Err(e) = run_gateway_http_server(http_port, node_ctx, req_tx, secure2, gateway_secure).await { log::error!("HTTP Error: {}", e); @@ -212,6 +219,9 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod // Subscribe ConnectorHandler service controller.service_control(media_server_connector::AGENT_SERVICE_ID.into(), (), media_server_connector::agent_service::Control::Sub.into()); + // List all waiting router dump requests + let mut wait_dump_router = vec![]; + loop { if controller.process().is_none() { break; @@ -257,6 +267,11 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod controller.service_control(media_server_connector::AGENT_SERVICE_ID.into(), (), control.into()); } + while let Ok(v) = dump_rx.try_recv() { + controller.feature_control((), router_sync::Control::DumpRouter.into()); + wait_dump_router.push(v); + } + while let Some(out) = controller.pop_event() { match out { SdnExtOut::ServicesEvent(_, _, SE::Gateway(event)) => match event { @@ -276,6 +291,14 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod log::error!("[MediaGateway] forward Sdn SocketEvent error {:?}", e); } } + SdnExtOut::FeaturesEvent(_, FeaturesEvent::RouterSync(event)) => match event { + router_sync::Event::DumpRouter(dump) => { + let json = serde_json::to_value(dump).expect("should convert json"); + while let Some(v) = wait_dump_router.pop() { + let _ = v.send(json.clone()); + } + } + }, _ => {} } } diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 6fbc04d5..5b5a15be 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -5,7 +5,10 @@ use std::{ time::{Duration, Instant}, }; -use atm0s_sdn::{features::FeaturesEvent, generate_node_addr, SdnExtIn, SdnExtOut, TimePivot, TimeTicker}; +use atm0s_sdn::{ + features::{router_sync, FeaturesEvent}, + generate_node_addr, SdnExtIn, SdnExtOut, TimePivot, TimeTicker, +}; use clap::Parser; use media_server_gateway::ServiceKind; use media_server_multi_tenancy::MultiTenancyStorage; @@ -25,6 +28,7 @@ use rand::random; use rtpengine_ngcontrol::NgUdpTransport; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use sans_io_runtime::{backend::PollingBackend, Controller}; +use tokio::sync::mpsc::channel; use crate::{ http::{run_media_http_server, NodeApiCtx}, @@ -99,6 +103,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node let secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes())); let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); let node_addr = generate_node_addr(node.node_id, &node.bind_addrs, node.bind_addrs_alt.clone()); + let (dump_tx, mut dump_rx) = channel(10); if let Some(http_port) = http_port { let secure_gateway = args.enable_token_api.then(|| { let app_storage = Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None)); @@ -106,7 +111,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node }); let req_tx = req_tx.clone(); let secure_edge = secure.clone(); - let node_ctx = NodeApiCtx { address: node_addr.clone() }; + let node_ctx = NodeApiCtx { address: node_addr.clone(), dump_tx }; tokio::spawn(async move { if let Err(e) = run_media_http_server(http_port, node_ctx, req_tx, secure_edge, secure_gateway).await { log::error!("HTTP Error: {}", e); @@ -207,6 +212,9 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node let timer = TimePivot::build(); let mut ticker = TimeTicker::build(1000); + // List all waiting router dump requests + let mut wait_dump_router = vec![]; + loop { if controller.process().is_none() { break; @@ -267,6 +275,14 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node } } + while let Ok(v) = dump_rx.try_recv() { + controller.send_to( + 0, + ExtIn::Sdn(SdnExtIn::FeaturesControl(media_server_runner::UserData::Cluster, router_sync::Control::DumpRouter.into()), true), + ); + wait_dump_router.push(v); + } + while let Some(out) = controller.pop_event() { match out { ExtOut::Rpc(req_id, worker, res) => { @@ -283,6 +299,14 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node log::error!("[MediaEdge] forward Sdn SocketEvent error {:?}", e); } } + ExtOut::Sdn(SdnExtOut::FeaturesEvent(_, FeaturesEvent::RouterSync(event))) => match event { + router_sync::Event::DumpRouter(dump) => { + let json = serde_json::to_value(dump).expect("should convert json"); + while let Some(v) = wait_dump_router.pop() { + let _ = v.send(json.clone()); + } + } + }, ExtOut::Sdn(SdnExtOut::ServicesEvent(_service, userdata, SE::Connector(event))) => { match event { media_server_connector::agent_service::Event::Response(res) => { diff --git a/bin/src/server/standalone.rs b/bin/src/server/standalone.rs index 9850527a..0d04bf26 100644 --- a/bin/src/server/standalone.rs +++ b/bin/src/server/standalone.rs @@ -176,6 +176,7 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) { tokio::task::spawn_local(async move { super::run_media_connector( workers, + None, NodeConfig { node_id: 30, secret,