From 61ef5dbdebf22a803d5d7a93d044e506e759497e Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 18 Nov 2024 12:01:39 +0700 Subject: [PATCH] feat: SDN discovery over node http-api --- Cargo.lock | 1 + bin/Cargo.toml | 1 + bin/src/http.rs | 49 ++++++++++++++++++++++++++++++++------- bin/src/http/api_node.rs | 29 +++++++++++++++++++++++ bin/src/lib.rs | 13 ++++++++++- bin/src/main.rs | 17 ++++++++++++-- bin/src/server/console.rs | 5 ++-- bin/src/server/gateway.rs | 28 +++++++++++----------- bin/src/server/media.rs | 8 ++++--- bin/z0_connector_n4.sh | 2 +- bin/z0_gate_n1.sh | 2 +- bin/z0_media_n2.sh | 5 ++-- bin/z0_media_n3.sh | 5 ++-- 13 files changed, 128 insertions(+), 37 deletions(-) create mode 100644 bin/src/http/api_node.rs diff --git a/Cargo.lock b/Cargo.lock index aab95f4d..4c9a753f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,6 +489,7 @@ dependencies = [ "quinn", "rand 0.8.5", "rcgen", + "reqwest", "rtpengine-ngcontrol", "rust-embed", "rustls", diff --git a/bin/Cargo.toml b/bin/Cargo.toml index fb75bcbc..3a35de4e 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -38,6 +38,7 @@ maxminddb = { version = "0.24", optional = true } sysinfo = { version = "0.31", optional = true } hex = { version = "0.4", optional = true } mime_guess = { version = "2.0", optional = true } +reqwest = { version = "0.12", features = ["json"]} sentry = "0.34" [features] diff --git a/bin/src/http.rs b/bin/src/http.rs index db43ad8c..f8f7d488 100644 --- a/bin/src/http.rs +++ b/bin/src/http.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; +pub use api_node::NodeApiCtx; use media_server_protocol::endpoint::ClusterConnId; #[cfg(feature = "console")] use media_server_protocol::protobuf::cluster_connector::MediaConnectorServiceClient; @@ -14,6 +15,7 @@ use poem::{listener::TcpListener, middleware::Cors, EndpointExt, Route, Server}; use poem_openapi::types::{ToJSON, Type}; use poem_openapi::OpenApiService; use poem_openapi::{types::ParseFromJSON, Object}; +use serde::Deserialize; use tokio::sync::mpsc::Sender; #[cfg(feature = "embed_static")] use utils::EmbeddedFilesEndpoint; @@ -21,6 +23,7 @@ use utils::EmbeddedFilesEndpoint; mod api_console; mod api_media; mod api_metrics; +mod api_node; mod api_token; mod utils; @@ -34,13 +37,13 @@ pub struct PublicMediaFiles; #[folder = "public/console"] pub struct PublicConsoleFiles; -#[derive(Debug, Default, Object)] +#[derive(Debug, Default, Object, Deserialize)] pub struct Pagination { pub total: usize, pub current: usize, } -#[derive(Debug, Object)] +#[derive(Debug, Object, Deserialize)] pub struct Response { pub status: bool, #[oai(skip_serializing_if = "Option::is_none")] @@ -65,25 +68,31 @@ impl Default for Response { #[cfg(feature = "console")] pub async fn run_console_http_server( port: u16, + node: NodeApiCtx, secure: media_server_secure::jwt::MediaConsoleSecureJwt, storage: crate::server::console_storage::StorageShared, connector: MediaConnectorServiceClient, ) -> Result<(), Box> { use poem::middleware::Tracing; - let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); - let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/"); + let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/"); let user_ui = user_service.swagger_ui(); let user_spec = user_service.spec(); - let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Console Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/"); + let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/"); let cluster_ui = cluster_service.swagger_ui(); let cluster_spec = cluster_service.spec(); - let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Console Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/"); + let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/"); let connector_ui = connector_service.swagger_ui(); let connector_spec = connector_service.spec(); @@ -96,6 +105,10 @@ pub async fn run_console_http_server( let route = Route::new() .nest("/", console_panel) + //node + .nest("/api/node/", node_service) + .nest("/api/node/ui", node_ui) + .at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone())) //metrics .nest("/api/metrics/", metrics_service) .nest("/api/metrics/ui", metrics_ui) @@ -122,6 +135,7 @@ pub async fn run_console_http_server( #[cfg(feature = "gateway")] pub async fn run_gateway_http_server( port: u16, + node: NodeApiCtx, sender: Sender, RpcRes>>, edge_secure: Arc, gateway_secure: Arc, @@ -130,7 +144,12 @@ pub async fn run_gateway_http_server = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); @@ -177,6 +196,10 @@ pub async fn run_gateway_http_server( port: u16, + node: NodeApiCtx, sender: Sender, RpcRes>>, edge_secure: Arc, gateway_secure: Option>, ) -> Result<(), Box> { let mut route = Route::new(); - let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); + let node_api = api_node::Apis::new(node); + let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/"); + let node_ui = node_service.swagger_ui(); + let node_spec = node_service.spec(); + + let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/"); let metrics_ui = metrics_service.swagger_ui(); let metrics_spec = metrics_service.spec(); @@ -273,6 +302,10 @@ pub async fn run_media_http_server Self { + Self { ctx } + } +} + +#[OpenApi] +impl Apis { + #[oai(path = "/address", method = "get")] + async fn get_address(&self) -> Json> { + Json(Response { + status: true, + data: Some(self.ctx.address.clone()), + ..Default::default() + }) + } +} diff --git a/bin/src/lib.rs b/bin/src/lib.rs index 6eb8492e..f77ffa04 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, str::FromStr}; use atm0s_sdn::{NodeAddr, NodeId}; use media_server_protocol::cluster::ZoneId; @@ -22,3 +22,14 @@ pub struct NodeConfig { pub zone: ZoneId, pub bind_addrs_alt: Vec, } + +pub async fn fetch_node_addr_from_api(url: &str) -> Result { + let resp = reqwest::get(url).await.map_err(|e| e.to_string())?; + let node_addr = resp + .json::>() + .await + .map_err(|e| e.to_string())? + .data + .ok_or(format!("No data in response from {}", url))?; + NodeAddr::from_str(&node_addr).map_err(|e| e.to_string()) +} diff --git a/bin/src/main.rs b/bin/src/main.rs index 215ca325..dd211b82 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -1,6 +1,6 @@ use std::net::{IpAddr, SocketAddr}; -use atm0s_media_server::{server, NodeConfig}; +use atm0s_media_server::{fetch_node_addr_from_api, server, NodeConfig}; use atm0s_sdn::NodeAddr; use clap::Parser; use media_server_protocol::cluster::ZoneId; @@ -56,6 +56,12 @@ struct Args { #[arg(env, long)] seeds: Vec, + /// Seeds from API, this is used for auto-discovery of seeds. + /// It is very useful for cloud deployment. + /// Currently all of nodes expose /api/node/address endpoint, so we can get seeds from there. + #[arg(env, long)] + seeds_from_node_api: Option, + /// Number of worker threads to spawn. #[arg(env, long, default_value_t = 1)] workers: usize, @@ -118,7 +124,7 @@ async fn main() { .map(|(_name, ip)| SocketAddr::new(ip, sdn_port)) .collect::>() }; - let node = NodeConfig { + let mut node = NodeConfig { node_id: ZoneId(args.sdn_zone_id).to_node_id(args.sdn_zone_node_id), secret: args.secret, seeds: args.seeds, @@ -129,6 +135,13 @@ async fn main() { log::info!("Bind addrs {:?}, bind addrs alt {:?}", node.bind_addrs, node.bind_addrs_alt); + if let Some(seeds_from_node_api) = args.seeds_from_node_api { + log::info!("Generate seeds from node_api {}", seeds_from_node_api); + let addr = fetch_node_addr_from_api(&seeds_from_node_api).await.expect("should get seed"); + log::info!("Generated seed {:?}", addr); + node.seeds = vec![addr]; + } + let local = tokio::task::LocalSet::new(); local .run_until(async move { diff --git a/bin/src/server/console.rs b/bin/src/server/console.rs index ab9c7096..8d66ec85 100644 --- a/bin/src/server/console.rs +++ b/bin/src/server/console.rs @@ -11,7 +11,7 @@ use media_server_secure::jwt::MediaConsoleSecureJwt; use storage::StorageShared; use crate::{ - http::run_console_http_server, + http::{run_console_http_server, NodeApiCtx}, node_metrics::NodeMetricsCollector, quinn::{make_quinn_client, VirtualNetwork}, NodeConfig, @@ -72,8 +72,9 @@ pub async fn run_console_server(workers: usize, http_port: Option, node: No if let Some(http_port) = http_port { let secure = MediaConsoleSecureJwt::from(node.secret.as_bytes()); let storage = storage.clone(); + let node_ctx = NodeApiCtx { address: node_addr.to_string() }; tokio::spawn(async move { - if let Err(e) = run_console_http_server(http_port, secure, storage, connector_rpc_client).await { + if let Err(e) = run_console_http_server(http_port, node_ctx, secure, storage, connector_rpc_client).await { log::error!("HTTP Error: {}", e); } }); diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index f2f25207..08eae977 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -17,7 +17,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use std::net::SocketAddr; use crate::{ - http::run_gateway_http_server, + http::{run_gateway_http_server, NodeApiCtx}, ng_controller::NgControllerServer, node_metrics::NodeMetricsCollector, quinn::{make_quinn_client, make_quinn_server, VirtualNetwork}, @@ -114,17 +114,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let gateway_secure = Arc::new(gateway_secure); let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); - if let Some(http_port) = http_port { - let req_tx = req_tx.clone(); - let secure2 = edge_secure.clone(); - tokio::spawn(async move { - if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure).await { - log::error!("HTTP Error: {}", e); - } - }); - } - - //Running ng controller for Voip + // Running ng controller for Voip if let Some(ngproto_addr) = args.rtpengine_cmd_addr { let req_tx = req_tx.clone(); let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await; @@ -137,8 +127,8 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod }); } + // Setup Sdn let node_id = node.node_id; - let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.bind_addrs_alt); let node_addr = builder.node_addr(); let node_info = ClusterNodeInfo::Gateway( @@ -168,6 +158,18 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let mut controller = builder.build::>(workers, node_info); let (selector, mut requester) = build_dest_selector(); + // Setup HTTP server + if let Some(http_port) = http_port { + let req_tx = req_tx.clone(); + let secure2 = edge_secure.clone(); + let node_ctx = NodeApiCtx { address: node_addr.to_string() }; + tokio::spawn(async move { + if let Err(e) = run_gateway_http_server(http_port, node_ctx, req_tx, secure2, gateway_secure).await { + log::error!("HTTP Error: {}", e); + } + }); + } + // Ip location for routing client to closest gateway let ip2location = Arc::new(Ip2Location::new(&args.geo_db)); diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 4a92322c..86f22eee 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -5,7 +5,7 @@ use std::{ time::{Duration, Instant}, }; -use atm0s_sdn::{features::FeaturesEvent, SdnExtIn, SdnExtOut, TimePivot, TimeTicker}; +use atm0s_sdn::{features::FeaturesEvent, generate_node_addr, SdnExtIn, SdnExtOut, TimePivot, TimeTicker}; use clap::Parser; use media_server_gateway::ServiceKind; use media_server_multi_tenancy::MultiTenancyStorage; @@ -27,7 +27,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use sans_io_runtime::{backend::PollingBackend, Controller}; use crate::{ - http::run_media_http_server, + http::{run_media_http_server, NodeApiCtx}, ng_controller::NgControllerServer, node_metrics::NodeMetricsCollector, quinn::{make_quinn_server, VirtualNetwork}, @@ -92,6 +92,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node let secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes())); let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); + let node_addr = generate_node_addr(node.node_id, &node.bind_addrs, node.bind_addrs_alt.clone()); if let Some(http_port) = http_port { let secure_gateway = args.enable_token_api.then(|| { let app_storage = Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None)); @@ -99,8 +100,9 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node }); let req_tx = req_tx.clone(); let secure_edge = secure.clone(); + let node_ctx = NodeApiCtx { address: node_addr.to_string() }; tokio::spawn(async move { - if let Err(e) = run_media_http_server(http_port, req_tx, secure_edge, secure_gateway).await { + if let Err(e) = run_media_http_server(http_port, node_ctx, req_tx, secure_edge, secure_gateway).await { log::error!("HTTP Error: {}", e); } }); diff --git a/bin/z0_connector_n4.sh b/bin/z0_connector_n4.sh index 262308fa..b8001989 100644 --- a/bin/z0_connector_n4.sh +++ b/bin/z0_connector_n4.sh @@ -3,6 +3,6 @@ RUST_BACKTRACE=1 \ cargo run -- \ --sdn-zone-id 0 \ --sdn-zone-node-id 4 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-node-api "http://localhost:3000/api/node/address" \ connector \ --s3-uri "http://minioadmin:minioadmin@127.0.0.1:9000/record" diff --git a/bin/z0_gate_n1.sh b/bin/z0_gate_n1.sh index 22ad93c2..2fec78f8 100644 --- a/bin/z0_gate_n1.sh +++ b/bin/z0_gate_n1.sh @@ -6,7 +6,7 @@ cargo run -- \ --sdn-port 10001 \ --sdn-zone-id 0 \ --sdn-zone-node-id 1 \ - --seeds 0@/ip4/127.0.0.1/udp/10000 \ + --seeds-from-node-api "http://localhost:8080/api/node/address" \ --workers 2 \ gateway \ --lat 10 \ diff --git a/bin/z0_media_n2.sh b/bin/z0_media_n2.sh index 566b4fdb..18db3e71 100644 --- a/bin/z0_media_n2.sh +++ b/bin/z0_media_n2.sh @@ -4,7 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 2 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-node-api "http://localhost:3000/api/node/address" \ --workers 2 \ - media \ - --enable-token-api + media diff --git a/bin/z0_media_n3.sh b/bin/z0_media_n3.sh index bdda16fa..499a8a67 100644 --- a/bin/z0_media_n3.sh +++ b/bin/z0_media_n3.sh @@ -4,7 +4,6 @@ cargo run -- \ --enable-private-ip \ --sdn-zone-id 0 \ --sdn-zone-node-id 3 \ - --seeds 1@/ip4/127.0.0.1/udp/10001 \ + --seeds-from-node-api "http://localhost:3000/api/node/address" \ --workers 2 \ - media \ - --enable-token-api + media