diff --git a/Cargo.lock b/Cargo.lock index 84eabb3b..1e8fc72b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,7 +176,7 @@ dependencies = [ [[package]] name = "atm0s-sdn" version = "0.1.10" -source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=38517f26bbf843f2734f476fd0ac3d2e2053efbc#38517f26bbf843f2734f476fd0ac3d2e2053efbc" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=2ad5d3a092b63f871a90a9600d2fcc8cb3027a24#2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" dependencies = [ "atm0s-sdn-identity", "atm0s-sdn-network", @@ -196,7 +196,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-identity" version = "0.2.0" -source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=38517f26bbf843f2734f476fd0ac3d2e2053efbc#38517f26bbf843f2734f476fd0ac3d2e2053efbc" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=2ad5d3a092b63f871a90a9600d2fcc8cb3027a24#2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" dependencies = [ "multiaddr", "rand", @@ -206,7 +206,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-network" version = "0.3.1" -source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=38517f26bbf843f2734f476fd0ac3d2e2053efbc#38517f26bbf843f2734f476fd0ac3d2e2053efbc" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=2ad5d3a092b63f871a90a9600d2fcc8cb3027a24#2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" dependencies = [ "aes-gcm", "atm0s-sdn-identity", @@ -233,7 +233,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-router" version = "0.1.4" -source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=38517f26bbf843f2734f476fd0ac3d2e2053efbc#38517f26bbf843f2734f476fd0ac3d2e2053efbc" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=2ad5d3a092b63f871a90a9600d2fcc8cb3027a24#2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" dependencies = [ "atm0s-sdn-identity", "atm0s-sdn-utils", @@ -245,7 +245,7 @@ dependencies = [ [[package]] name = "atm0s-sdn-utils" version = "0.1.1" -source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=38517f26bbf843f2734f476fd0ac3d2e2053efbc#38517f26bbf843f2734f476fd0ac3d2e2053efbc" +source = "git+https://github.com/giangndm/8xFF-decentralized-sdn.git?rev=2ad5d3a092b63f871a90a9600d2fcc8cb3027a24#2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" dependencies = [ "log", "serde", @@ -272,9 +272,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "aws-lc-rs" -version = "1.7.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "474d7cec9d0a1126fad1b224b767fcbf351c23b0309bb21ec210bcfd379926a5" +checksum = "bf7d844e282b4b56750b2d4e893b2205581ded8709fddd2b6aa5418c150ca877" dependencies = [ "aws-lc-sys", "mirai-annotations", @@ -284,9 +284,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7505fc3cb7acbf42699a43a79dd9caa4ed9e99861dfbb837c5c0fb5a0a8d2980" +checksum = "c3a2c29203f6bf296d01141cc8bb9dbd5ecd4c27843f2ee0767bcd5985a927da" dependencies = [ "bindgen", "cc", @@ -299,9 +299,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.72" +version = "0.3.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17c6a35df3749d2e8bb1b7b21a976d82b15548788d2735b9d82f329268f71a11" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" dependencies = [ "addr2line", "cc", @@ -538,9 +538,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.6" +version = "4.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9689a29b593160de5bc4aacab7b5d54fb52231de70122626c178e6a368994c7" +checksum = "5db83dced34638ad474f39f250d7fea9598bdd239eaced1bdf45d597da0f433f" dependencies = [ "clap_builder", "clap_derive", @@ -548,9 +548,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.6" +version = "4.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5387378c84f6faa26890ebf9f0a92989f8873d4d380467bcd0d8d8620424df" +checksum = "f7e204572485eb3fbf28f871612191521df159bc3e15a9f5064c66dba3a8c05f" dependencies = [ "anstream", "anstyle", @@ -781,15 +781,14 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.2" +version = "4.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "fiat-crypto", - "platforms", "rustc_version", "subtle", "zeroize", @@ -900,15 +899,15 @@ dependencies = [ [[package]] name = "derive_more" -version = "0.99.17" +version = "0.99.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" dependencies = [ "convert_case", "proc-macro2", "quote", "rustc_version", - "syn 1.0.109", + "syn 2.0.66", ] [[package]] @@ -1204,8 +1203,8 @@ dependencies = [ "aho-corasick", "bstr", "log", - "regex-automata 0.4.6", - "regex-syntax 0.8.3", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", ] [[package]] @@ -1310,6 +1309,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "hkdf" version = "0.12.4" @@ -1384,12 +1389,12 @@ dependencies = [ [[package]] name = "http-body-util" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", - "futures-core", + "futures-util", "http", "http-body", "pin-project-lite", @@ -1397,9 +1402,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.8.0" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" [[package]] name = "httpdate" @@ -1500,7 +1505,7 @@ dependencies = [ "globset", "log", "memchr", - "regex-automata 0.4.6", + "regex-automata 0.4.7", "same-file", "walkdir", "winapi-util", @@ -1672,9 +1677,9 @@ checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "libp2p-identity" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "999ec70441b2fb35355076726a6bc466c932e9bdc66f6a11c6c0aa17c7ab9be0" +checksum = "55cca1eb2bc1fd29f099f3daaab7effd01e1a54b7c577d0ed082521034d912e8" dependencies = [ "bs58", "hkdf", @@ -1824,9 +1829,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.2" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "mime" @@ -1852,9 +1857,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" dependencies = [ "adler", ] @@ -2136,7 +2141,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", ] @@ -2163,9 +2168,9 @@ dependencies = [ [[package]] name = "object" -version = "0.35.0" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8ec7ab813848ba4522158d5517a6093db1ded27575b070f4177b8d12b41db5e" +checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434" dependencies = [ "memchr", ] @@ -2461,12 +2466,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" -[[package]] -name = "platforms" -version = "3.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" - [[package]] name = "poem" version = "3.0.1" @@ -2567,13 +2566,13 @@ dependencies = [ [[package]] name = "polling" -version = "3.7.1" +version = "3.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e6a007746f34ed64099e88783b0ae369eaa3da6392868ba262e2af9b8fbaea1" +checksum = "a3ed00ed3fbf728b5816498ecd316d1716eecaced9c0c8d2c5a6740ca214985b" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi", + "hermit-abi 0.4.0", "pin-project-lite", "rustix", "tracing", @@ -2861,23 +2860,23 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" dependencies = [ "bitflags", ] [[package]] name = "regex" -version = "1.10.4" +version = "1.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.6", - "regex-syntax 0.8.3", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", ] [[package]] @@ -2891,13 +2890,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.3", + "regex-syntax 0.8.4", ] [[package]] @@ -2908,9 +2907,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" +checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "rfc6979" @@ -3003,9 +3002,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.9" +version = "0.23.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a218f0f6d05669de4eabfb24f31ce802035c952429d037507b4a4a39f0e60c5b" +checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" dependencies = [ "aws-lc-rs", "log", @@ -3899,9 +3898,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.0" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", "idna", @@ -4013,9 +4012,9 @@ checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] name = "webpki-roots" -version = "0.26.2" +version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c452ad30530b54a4d8e71952716a212b08efd0f3562baa66c29a618b07da7c3" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" dependencies = [ "rustls-pki-types", ] diff --git a/Cargo.toml b/Cargo.toml index e549030c..2eeeb7a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ members = [ [workspace.dependencies] sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" } -atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", rev = "38517f26bbf843f2734f476fd0ac3d2e2053efbc" } +atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", rev = "2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" } tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] } convert-enum = "0.1" num_enum = "0.7" diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 3355c572..5c2f72a8 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -32,9 +32,10 @@ maxminddb = { version = "0.24", optional = true } sysinfo = { version = "0.30", optional = true } [features] -default = ["gateway", "media", "connector", "cert_utils"] +default = ["console", "gateway", "media", "connector", "cert_utils"] gateway = ["media-server-gateway", "quinn_vnet", "node_metrics", "maxminddb"] media = ["media-server-runner", "quinn_vnet", "node_metrics"] +console = [] connector = ["quinn_vnet"] cert_utils = ["rcgen", "rustls"] quinn_vnet = ["rustls", "quinn"] diff --git a/bin/console.sh b/bin/console.sh new file mode 100644 index 00000000..26d66e05 --- /dev/null +++ b/bin/console.sh @@ -0,0 +1,8 @@ +RUST_LOG=info \ +RUST_BACKTRACE=1 \ +cargo run -- \ + --http-port 8080 \ + --node-id 0 \ + --sdn-port 10000 \ + --sdn-zone 0 \ + console diff --git a/bin/gate_z0_n1.sh b/bin/gate_z0_n1.sh index b2bcbd77..c611ba31 100644 --- a/bin/gate_z0_n1.sh +++ b/bin/gate_z0_n1.sh @@ -2,9 +2,10 @@ RUST_LOG=info \ RUST_BACKTRACE=1 \ cargo run -- \ --http-port 3000 \ - --node-id 0 \ - --sdn-port 10000 \ + --node-id 1 \ + --sdn-port 10001 \ --sdn-zone 0 \ + --seeds 0@/ip4/127.0.0.1/udp/10000 \ gateway \ --lat 10 \ --lon 20 \ diff --git a/bin/media_z0_n2.sh b/bin/media_z0_n2.sh index 925da6f9..3689fce0 100644 --- a/bin/media_z0_n2.sh +++ b/bin/media_z0_n2.sh @@ -5,7 +5,7 @@ cargo run -- \ --node-id 2 \ --sdn-port 10002 \ --sdn-zone 0 \ - --seeds 0@/ip4/127.0.0.1/udp/10000 \ + --seeds 1@/ip4/127.0.0.1/udp/10001 \ media \ --allow-private-ip \ --enable-token-api diff --git a/bin/media_z0_n1.sh b/bin/media_z0_n3.sh similarity index 56% rename from bin/media_z0_n1.sh rename to bin/media_z0_n3.sh index 112380d9..b1be2d3f 100644 --- a/bin/media_z0_n1.sh +++ b/bin/media_z0_n3.sh @@ -1,11 +1,11 @@ RUST_LOG=info \ RUST_BACKTRACE=1 \ cargo run -- \ - --http-port 3001 \ - --node-id 1 \ - --sdn-port 10001 \ + --http-port 3003 \ + --node-id 3 \ + --sdn-port 10003 \ --sdn-zone 0 \ - --seeds 0@/ip4/127.0.0.1/udp/10000 \ + --seeds 1@/ip4/127.0.0.1/udp/10001 \ media \ --allow-private-ip \ --enable-token-api diff --git a/bin/src/http.rs b/bin/src/http.rs index 6829c8d2..e8790104 100644 --- a/bin/src/http.rs +++ b/bin/src/http.rs @@ -12,6 +12,7 @@ use poem_openapi::{types::ParseFromJSON, Object}; use tokio::sync::mpsc::Sender; mod api_connector; +mod api_console; mod api_media; mod api_token; mod utils; @@ -42,6 +43,40 @@ impl Rpc { } } +#[cfg(feature = "console")] +pub async fn run_console_http_server( + port: u16, + secure: media_server_secure::jwt::MediaConsoleSecureJwt, + storage: crate::server::console_storage::StorageShared, +) -> Result<(), Box> { + let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/"); + let user_ui = user_service.swagger_ui(); + let user_spec = user_service.spec(); + + let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Console Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/"); + let cluster_ui = cluster_service.swagger_ui(); + let cluster_spec = cluster_service.spec(); + + let ctx = api_console::ConsoleApisCtx { secure, storage }; + + let route = Route::new() + //TODO build UI and embed to here + .nest("/", StaticFilesEndpoint::new("./public").index_file("index.html")) + //user + .nest("/api/user/", user_service.data(ctx.clone())) + .nest("/api/user/ui", user_ui) + .at("/api/user/spec", poem::endpoint::make_sync(move |_| user_spec.clone())) + //cluster + .nest("/api/cluster/", cluster_service.data(ctx.clone())) + .nest("/api/cluster/ui", cluster_ui) + .at("/api/cluster/spec", poem::endpoint::make_sync(move |_| cluster_spec.clone())) + .with(Cors::new()); + + Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?; + Ok(()) +} + +#[cfg(feature = "gateway")] pub async fn run_gateway_http_server( port: u16, sender: Sender, RpcRes>>, @@ -68,6 +103,7 @@ pub async fn run_gateway_http_server( port: u16, sender: Sender, RpcRes>>, diff --git a/bin/src/http/api_console.rs b/bin/src/http/api_console.rs new file mode 100644 index 00000000..d45722f8 --- /dev/null +++ b/bin/src/http/api_console.rs @@ -0,0 +1,24 @@ +use media_server_secure::{jwt::MediaConsoleSecureJwt, MediaConsoleSecure}; +use poem::Request; +use poem_openapi::{auth::ApiKey, SecurityScheme}; + +use crate::server::console_storage::StorageShared; + +pub mod cluster; +pub mod user; + +#[derive(Clone)] +pub struct ConsoleApisCtx { + pub secure: MediaConsoleSecureJwt, //TODO make it generic + pub storage: StorageShared, +} + +/// ApiKey authorization +#[derive(SecurityScheme)] +#[oai(ty = "api_key", key_name = "X-API-Key", key_in = "header", checker = "api_checker")] +struct ConsoleAuthorization(()); + +async fn api_checker(req: &Request, api_key: ApiKey) -> Option<()> { + let data = req.data::()?; + data.secure.validate_token(&api_key.key).then(|| ()) +} diff --git a/bin/src/http/api_console/cluster.rs b/bin/src/http/api_console/cluster.rs new file mode 100644 index 00000000..3469f11a --- /dev/null +++ b/bin/src/http/api_console/cluster.rs @@ -0,0 +1,38 @@ +use crate::server::console_storage::{Zone, ZoneDetails}; + +use super::{super::Response, ConsoleApisCtx, ConsoleAuthorization}; +use poem::web::Data; +use poem_openapi::{param::Path, payload::Json, OpenApi}; + +pub struct Apis; + +#[OpenApi] +impl Apis { + /// get zones + #[oai(path = "/zones", method = "get")] + async fn zones(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>) -> Json>> { + Json(Response { + status: true, + error: None, + data: Some(ctx.storage.zones()), + }) + } + + /// get zone + #[oai(path = "/zones/:zone_id", method = "get")] + async fn zone(&self, _auth: ConsoleAuthorization, zone_id: Path, Data(ctx): Data<&ConsoleApisCtx>) -> Json> { + if let Some(zone) = ctx.storage.zone(zone_id.0) { + Json(Response { + status: true, + error: None, + data: Some(zone), + }) + } else { + Json(Response { + status: true, + error: Some("ZONE_NOT_FOUND".to_string()), + data: None, + }) + } + } +} diff --git a/bin/src/http/api_console/user.rs b/bin/src/http/api_console/user.rs new file mode 100644 index 00000000..170b80f4 --- /dev/null +++ b/bin/src/http/api_console/user.rs @@ -0,0 +1,37 @@ +use super::{super::Response, ConsoleApisCtx}; +use media_server_secure::MediaConsoleSecure; +use poem::web::Data; +use poem_openapi::{payload::Json, OpenApi}; + +#[derive(poem_openapi::Object)] +pub struct UserLoginReq { + pub secret: String, +} + +#[derive(poem_openapi::Object)] +pub struct UserLoginRes { + pub token: String, +} + +pub struct Apis; + +#[OpenApi] +impl Apis { + /// login with user credentials + #[oai(path = "/user/login", method = "post")] + async fn user_login(&self, Data(ctx): Data<&ConsoleApisCtx>, body: Json) -> Json> { + if ctx.secure.validate_secert(&body.secret) { + Json(Response { + status: true, + error: None, + data: Some(UserLoginRes { token: ctx.secure.generate_token() }), + }) + } else { + Json(Response { + status: false, + error: Some("WRONG_SECRET".to_string()), + data: None, + }) + } + } +} diff --git a/bin/src/main.rs b/bin/src/main.rs index 623fae11..cf787e35 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -75,6 +75,8 @@ async fn main() { local .run_until(async move { match args.server { + #[cfg(feature = "console")] + server::ServerType::Console(args) => server::run_console_server(workers, http_port, node, args).await, #[cfg(feature = "gateway")] server::ServerType::Gateway(args) => server::run_media_gateway(workers, http_port, node, args).await, #[cfg(feature = "connector")] diff --git a/bin/src/server.rs b/bin/src/server.rs index 65586f60..f0224ff1 100644 --- a/bin/src/server.rs +++ b/bin/src/server.rs @@ -4,6 +4,8 @@ use clap::Subcommand; mod cert; #[cfg(feature = "connector")] mod connector; +#[cfg(feature = "console")] +mod console; #[cfg(feature = "gateway")] mod gateway; #[cfg(feature = "media")] @@ -13,6 +15,8 @@ mod media; pub use cert::run_cert_utils; #[cfg(feature = "connector")] pub use connector::run_media_connector; +#[cfg(feature = "console")] +pub use console::{run_console_server, storage as console_storage}; #[cfg(feature = "gateway")] pub use gateway::run_media_gateway; #[cfg(feature = "media")] @@ -20,6 +24,8 @@ pub use media::run_media_server; #[derive(Debug, Subcommand)] pub enum ServerType { + #[cfg(feature = "console")] + Console(console::Args), #[cfg(feature = "gateway")] Gateway(gateway::Args), #[cfg(feature = "connector")] diff --git a/bin/src/server/console.rs b/bin/src/server/console.rs new file mode 100644 index 00000000..e2056d11 --- /dev/null +++ b/bin/src/server/console.rs @@ -0,0 +1,100 @@ +use std::time::{Duration, Instant}; + +use atm0s_sdn::{secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner}; +use clap::Parser; +use media_server_protocol::cluster::{ClusterNodeGenericInfo, ClusterNodeInfo}; +use media_server_secure::jwt::MediaConsoleSecureJwt; +use storage::StorageShared; + +use crate::{http::run_console_http_server, node_metrics::NodeMetricsCollector, NodeConfig}; +use sans_io_runtime::backend::PollingBackend; + +pub mod storage; + +#[derive(Clone, Debug, convert_enum::From, convert_enum::TryInto)] +enum SC { + Visual(visualization::Control), +} + +#[derive(Clone, Debug, convert_enum::From, convert_enum::TryInto)] +enum SE { + Visual(visualization::Event), +} +type TC = (); +type TW = (); + +#[derive(Debug, Parser)] +pub struct Args {} + +pub async fn run_console_server(workers: usize, http_port: Option, node: NodeConfig, _args: Args) { + let storage = StorageShared::default(); + if let Some(http_port) = http_port { + let secure = MediaConsoleSecureJwt::from(node.secret.as_bytes()); + let storage = storage.clone(); + tokio::spawn(async move { + if let Err(e) = run_console_http_server(http_port, secure, storage).await { + log::error!("HTTP Error: {}", e); + } + }); + } + + let node_id = node.node_id; + let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, node.udp_port, node.custom_addrs); + let node_addr = builder.node_addr(); + + builder.set_authorization(StaticKeyAuthorization::new(&node.secret)); + builder.set_visualization_collector(true); + + for seed in node.seeds { + builder.add_seed(seed); + } + + let node_info = ClusterNodeInfo::Console(ClusterNodeGenericInfo { + addr: builder.node_addr().to_string(), + cpu: 0, + memory: 0, + disk: 0, + }); + + let started_at = Instant::now(); + let mut controller = builder.build::>(workers, node_info); + controller.service_control(visualization::SERVICE_ID.into(), (), visualization::Control::Subscribe.into()); + + let mut node_metrics_collector = NodeMetricsCollector::default(); + + loop { + if controller.process().is_none() { + break; + } + + if let Some(metrics) = node_metrics_collector.pop_measure() { + let node_info = ClusterNodeInfo::Console(ClusterNodeGenericInfo { + addr: node_addr.to_string(), + cpu: metrics.cpu, + memory: metrics.memory, + disk: metrics.disk, + }); + controller.service_control(visualization::SERVICE_ID.into(), (), visualization::Control::UpdateInfo(node_info).into()); + storage.on_tick(started_at.elapsed().as_millis() as u64); + } + + while let Some(out) = controller.pop_event() { + match out { + SdnExtOut::ServicesEvent(_service, (), SE::Visual(event)) => match event { + visualization::Event::GotAll(all) => { + log::info!("Got all: {:?}", all); + } + visualization::Event::NodeChanged(node, info, conns) => { + log::info!("Node set: {:?} {:?} {:?}", node, info, conns); + storage.on_ping(started_at.elapsed().as_millis() as u64, node, info, conns); + } + visualization::Event::NodeRemoved(node) => { + log::info!("Node del: {:?}", node); + } + }, + SdnExtOut::FeaturesEvent(_, _) => {} + } + } + tokio::time::sleep(Duration::from_millis(10)).await; + } +} diff --git a/bin/src/server/console/storage.rs b/bin/src/server/console/storage.rs new file mode 100644 index 00000000..c6af5998 --- /dev/null +++ b/bin/src/server/console/storage.rs @@ -0,0 +1,561 @@ +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use atm0s_sdn::{services::visualization::ConnectionInfo, NodeId}; +use media_server_protocol::cluster::{ClusterGatewayInfo, ClusterMediaInfo, ClusterNodeGenericInfo, ClusterNodeInfo}; + +const NODE_TIMEOUT: u64 = 30_000; + +#[derive(poem_openapi::Object, Clone, Debug, PartialEq, Eq)] +pub struct Connection { + pub node: NodeId, + pub addr: String, + pub rtt_ms: u32, +} + +impl From for Connection { + fn from(value: ConnectionInfo) -> Self { + Self { + node: value.dest, + addr: value.remote.to_string(), + rtt_ms: value.rtt_ms, + } + } +} + +#[derive(poem_openapi::Object, Debug, PartialEq, Eq)] +pub struct ConsoleNode { + pub addr: String, + pub node_id: NodeId, + pub cpu: u8, + pub memory: u8, + pub disk: u8, + pub conns: Vec, +} + +#[derive(poem_openapi::Object, Debug, PartialEq, Eq)] +pub struct GatewayNode { + pub addr: String, + pub node_id: NodeId, + pub cpu: u8, + pub memory: u8, + pub conns: Vec, + pub disk: u8, + pub live: u32, + pub max: u32, +} + +#[derive(poem_openapi::Object, Debug, PartialEq, Eq)] +pub struct MediaNode { + pub addr: String, + pub node_id: NodeId, + pub cpu: u8, + pub memory: u8, + pub disk: u8, + pub conns: Vec, + pub live: u32, + pub max: u32, +} + +#[derive(poem_openapi::Object, Debug, PartialEq, Eq)] +pub struct ConnectorNode { + pub addr: String, + pub node_id: NodeId, + pub cpu: u8, + pub memory: u8, + pub disk: u8, + pub conns: Vec, +} + +#[derive(poem_openapi::Object, Debug, PartialEq)] +pub struct Zone { + pub lat: f32, + pub lon: f32, + pub zone_id: u32, + pub consoles: usize, + pub gateways: usize, + pub medias: usize, + pub connectors: usize, +} + +#[derive(poem_openapi::Object, Debug, PartialEq)] +pub struct ZoneDetails { + pub lat: f32, + pub lon: f32, + pub consoles: Vec, + pub gateways: Vec, + pub medias: Vec, + pub connectors: Vec, +} + +#[derive(Debug)] +struct ConsoleContainer { + last_updated: u64, + generic: ClusterNodeGenericInfo, + conns: Vec, +} + +#[derive(Debug)] +struct GatewayContainer { + last_updated: u64, + generic: ClusterNodeGenericInfo, + info: ClusterGatewayInfo, + conns: Vec, +} + +#[derive(Debug)] +struct MediaContainer { + last_updated: u64, + generic: ClusterNodeGenericInfo, + info: ClusterMediaInfo, + conns: Vec, +} + +#[derive(Debug)] +struct ConnectorContainer { + last_updated: u64, + generic: ClusterNodeGenericInfo, + conns: Vec, +} + +#[derive(Debug, Default)] +struct ZoneContainer { + lat: f32, + lon: f32, + consoles: HashMap, + gateways: HashMap, + medias: HashMap, + connectors: HashMap, +} + +#[derive(Debug, Default)] +struct Storage { + zones: HashMap, +} + +impl Storage { + pub fn on_tick(&mut self, now: u64) { + for (_, zone) in self.zones.iter_mut() { + zone.consoles.retain(|_, g| g.last_updated + NODE_TIMEOUT > now); + zone.gateways.retain(|_, g| g.last_updated + NODE_TIMEOUT > now); + zone.medias.retain(|_, g| g.last_updated + NODE_TIMEOUT > now); + zone.connectors.retain(|_, g| g.last_updated + NODE_TIMEOUT > now); + } + self.zones.retain(|_, z| z.consoles.len() + z.gateways.len() + z.medias.len() + z.connectors.len() > 0); + } + + pub fn on_ping(&mut self, now: u64, node: NodeId, info: ClusterNodeInfo, conns: Vec) { + match info { + ClusterNodeInfo::Console(generic) => { + let zone_id = node & 0xFF_FF_FF_00; + log::info!("Zone {zone_id} on console ping, zones {}", self.zones.len()); + let zone = self.zones.entry(zone_id).or_insert_with(Default::default); + zone.consoles.insert( + node, + ConsoleContainer { + last_updated: now, + generic, + conns: conns.into_iter().map(|c| c.into()).collect::>(), + }, + ); + log::info!("Zone {zone_id} on console ping, after zones {}", self.zones.len()); + } + ClusterNodeInfo::Gateway(generic, info) => { + let zone_id = node & 0xFF_FF_FF_00; + log::info!("Zone {zone_id} on gateway ping"); + let zone = self.zones.entry(zone_id).or_insert_with(Default::default); + zone.lat = info.lat; + zone.lon = info.lon; + zone.gateways.insert( + node, + GatewayContainer { + last_updated: now, + generic, + info, + conns: conns.into_iter().map(|c| c.into()).collect::>(), + }, + ); + } + ClusterNodeInfo::Media(generic, info) => { + let zone_id = node & 0xFF_FF_FF_00; + log::info!("Zone {zone_id} on media ping"); + let zone = self.zones.entry(zone_id).or_insert_with(Default::default); + zone.medias.insert( + node, + MediaContainer { + last_updated: now, + generic, + info, + conns: conns.into_iter().map(|c| c.into()).collect::>(), + }, + ); + } + ClusterNodeInfo::Connector(generic) => { + let zone_id = node & 0xFF_FF_FF_00; + log::info!("Zone {zone_id} on connector ping, zones {}", self.zones.len()); + let zone = self.zones.entry(zone_id).or_insert_with(Default::default); + zone.connectors.insert( + node, + ConnectorContainer { + last_updated: now, + generic, + conns: conns.into_iter().map(|c| c.into()).collect::>(), + }, + ); + log::info!("Zone {zone_id} on console ping, after zones {}", self.zones.len()); + } + } + } + + pub fn zones(&self) -> Vec { + self.zones + .iter() + .map(|(id, z)| Zone { + lat: z.lat, + lon: z.lon, + zone_id: *id, + consoles: z.consoles.len(), + gateways: z.gateways.len(), + medias: z.medias.len(), + connectors: z.connectors.len(), + }) + .collect::>() + } + + pub fn zone(&self, zone_id: u32) -> Option { + let z = self.zones.get(&zone_id)?; + Some(ZoneDetails { + lat: z.lat, + lon: z.lon, + consoles: z + .consoles + .iter() + .map(|(id, g)| ConsoleNode { + addr: g.generic.addr.clone(), + node_id: *id, + cpu: g.generic.cpu, + memory: g.generic.memory, + disk: g.generic.disk, + conns: g.conns.clone(), + }) + .collect::>(), + gateways: z + .gateways + .iter() + .map(|(id, g)| GatewayNode { + addr: g.generic.addr.clone(), + node_id: *id, + cpu: g.generic.cpu, + memory: g.generic.memory, + disk: g.generic.disk, + live: g.info.live, + max: g.info.max, + conns: g.conns.clone(), + }) + .collect::>(), + medias: z + .medias + .iter() + .map(|(id, g)| MediaNode { + addr: g.generic.addr.clone(), + node_id: *id, + cpu: g.generic.cpu, + memory: g.generic.memory, + disk: g.generic.disk, + live: g.info.live, + max: g.info.max, + conns: g.conns.clone(), + }) + .collect::>(), + connectors: z + .connectors + .iter() + .map(|(id, g)| ConnectorNode { + addr: g.generic.addr.clone(), + node_id: *id, + cpu: g.generic.cpu, + memory: g.generic.memory, + disk: g.generic.disk, + conns: g.conns.clone(), + }) + .collect::>(), + }) + } +} + +#[derive(Default, Clone)] +pub struct StorageShared { + storage: Arc>, +} + +impl StorageShared { + pub fn on_tick(&self, now: u64) { + self.storage.write().expect("should lock storage").on_tick(now); + } + + pub fn on_ping(&self, now: u64, node: NodeId, info: ClusterNodeInfo, conns: Vec) { + self.storage.write().expect("should lock storage").on_ping(now, node, info, conns); + } + + pub fn zones(&self) -> Vec { + self.storage.read().expect("should lock storage").zones() + } + + pub fn zone(&self, zone_id: u32) -> Option { + self.storage.read().expect("should lock storage").zone(zone_id) + } +} + +#[cfg(test)] +mod tests { + use media_server_protocol::cluster::{ClusterGatewayInfo, ClusterMediaInfo, ClusterNodeGenericInfo, ClusterNodeInfo}; + + use crate::server::console_storage::{ConnectorNode, ConsoleNode, GatewayNode, MediaNode, Zone, ZoneDetails, NODE_TIMEOUT}; + + use super::Storage; + + #[test] + fn collect_console() { + let mut storage = Storage::default(); + + storage.on_ping( + 0, + 1, + ClusterNodeInfo::Console(ClusterNodeGenericInfo { + addr: "addr".to_string(), + cpu: 11, + memory: 22, + disk: 33, + }), + vec![], + ); + storage.on_tick(0); + + assert_eq!( + storage.zones(), + vec![Zone { + lat: 0.0, + lon: 0.0, + zone_id: 0, + consoles: 1, + gateways: 0, + medias: 0, + connectors: 0, + }] + ); + + assert_eq!( + storage.zone(0), + Some(ZoneDetails { + lat: 0.0, + lon: 0.0, + consoles: vec![ConsoleNode { + addr: "addr".to_string(), + node_id: 1, + cpu: 11, + memory: 22, + disk: 33, + conns: vec![], + }], + gateways: vec![], + medias: vec![], + connectors: vec![] + }) + ); + + assert_eq!(storage.zone(1), None); + + storage.on_tick(NODE_TIMEOUT); + //after timeout should clear + assert_eq!(storage.zones(), vec![]); + assert_eq!(storage.zone(0), None); + } + + #[test] + fn collect_gateway() { + let mut storage = Storage::default(); + + storage.on_ping( + 0, + 1, + ClusterNodeInfo::Gateway( + ClusterNodeGenericInfo { + addr: "addr".to_string(), + cpu: 11, + memory: 22, + disk: 33, + }, + ClusterGatewayInfo { + live: 0, + max: 100, + lat: 10.0, + lon: 11.0, + }, + ), + vec![], + ); + storage.on_tick(0); + + assert_eq!( + storage.zones(), + vec![Zone { + lat: 10.0, + lon: 11.0, + zone_id: 0, + consoles: 0, + gateways: 1, + medias: 0, + connectors: 0, + }] + ); + + assert_eq!( + storage.zone(0), + Some(ZoneDetails { + lat: 10.0, + lon: 11.0, + consoles: vec![], + gateways: vec![GatewayNode { + addr: "addr".to_string(), + node_id: 1, + cpu: 11, + memory: 22, + disk: 33, + conns: vec![], + live: 0, + max: 100, + }], + medias: vec![], + connectors: vec![] + }) + ); + + assert_eq!(storage.zone(1), None); + + storage.on_tick(NODE_TIMEOUT); + //after timeout should clear + assert_eq!(storage.zones(), vec![]); + assert_eq!(storage.zone(0), None); + } + + #[test] + fn collect_media() { + let mut storage = Storage::default(); + + storage.on_ping( + 0, + 1, + ClusterNodeInfo::Media( + ClusterNodeGenericInfo { + addr: "addr".to_string(), + cpu: 11, + memory: 22, + disk: 33, + }, + ClusterMediaInfo { live: 0, max: 100 }, + ), + vec![], + ); + storage.on_tick(0); + + assert_eq!( + storage.zones(), + vec![Zone { + lat: 0.0, + lon: 0.0, + zone_id: 0, + consoles: 0, + gateways: 0, + medias: 1, + connectors: 0, + }] + ); + + assert_eq!( + storage.zone(0), + Some(ZoneDetails { + lat: 0.0, + lon: 0.0, + consoles: vec![], + gateways: vec![], + medias: vec![MediaNode { + addr: "addr".to_string(), + node_id: 1, + cpu: 11, + memory: 22, + disk: 33, + conns: vec![], + live: 0, + max: 100, + }], + connectors: vec![] + }) + ); + + assert_eq!(storage.zone(1), None); + + storage.on_tick(NODE_TIMEOUT); + //after timeout should clear + assert_eq!(storage.zones(), vec![]); + assert_eq!(storage.zone(0), None); + } + + #[test] + fn collect_connector() { + let mut storage = Storage::default(); + + storage.on_ping( + 0, + 1, + ClusterNodeInfo::Connector(ClusterNodeGenericInfo { + addr: "addr".to_string(), + cpu: 11, + memory: 22, + disk: 33, + }), + vec![], + ); + storage.on_tick(0); + + assert_eq!( + storage.zones(), + vec![Zone { + lat: 0.0, + lon: 0.0, + zone_id: 0, + consoles: 0, + gateways: 0, + medias: 0, + connectors: 1, + }] + ); + + assert_eq!( + storage.zone(0), + Some(ZoneDetails { + lat: 0.0, + lon: 0.0, + consoles: vec![], + gateways: vec![], + medias: vec![], + connectors: vec![ConnectorNode { + addr: "addr".to_string(), + node_id: 1, + cpu: 11, + memory: 22, + disk: 33, + conns: vec![], + }] + }) + ); + + assert_eq!(storage.zone(1), None); + + storage.on_tick(NODE_TIMEOUT); + //after timeout should clear + assert_eq!(storage.zones(), vec![]); + assert_eq!(storage.zone(0), None); + } +} diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index a0b2b84c..8c81232f 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -4,6 +4,7 @@ use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, service use clap::Parser; use media_server_gateway::{store_service::GatewayStoreServiceBuilder, STORE_SERVICE_ID}; use media_server_protocol::{ + cluster::{ClusterGatewayInfo, ClusterNodeGenericInfo, ClusterNodeInfo}, gateway::{generate_gateway_zone_tag, GATEWAY_RPC_PORT}, protobuf::cluster_gateway::{MediaEdgeServiceClient, MediaEdgeServiceServer}, rpc::quinn::{QuinnClient, QuinnServer}, @@ -28,13 +29,13 @@ mod remote_rpc_handler; #[derive(Clone, Debug, convert_enum::From, convert_enum::TryInto)] enum SC { - Visual(visualization::Control), + Visual(visualization::Control), Gateway(media_server_gateway::store_service::Control), } #[derive(Clone, Debug, convert_enum::From, convert_enum::TryInto)] enum SE { - Visual(visualization::Event), + Visual(visualization::Event), Gateway(media_server_gateway::store_service::Event), } type TC = (); @@ -88,7 +89,22 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let node_id = node.node_id; - let mut builder = SdnBuilder::<(), SC, SE, TC, TW>::new(node_id, node.udp_port, node.custom_addrs); + let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, node.udp_port, node.custom_addrs); + let node_addr = builder.node_addr(); + let node_info = ClusterNodeInfo::Gateway( + ClusterNodeGenericInfo { + addr: node_addr.to_string(), + cpu: 0, + memory: 0, + disk: 0, + }, + ClusterGatewayInfo { + lat: args.lat, + lon: args.lon, + live: 0, + max: 0, + }, + ); builder.set_authorization(StaticKeyAuthorization::new(&node.secret)); builder.set_manual_discovery(vec!["gateway".to_string(), generate_gateway_zone_tag(node.zone)], vec!["gateway".to_string()]); @@ -98,7 +114,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod builder.add_seed(seed); } - let mut controller = builder.build::>(workers); + let mut controller = builder.build::>(workers, node_info); let (selector, mut requester) = build_dest_selector(); // Ip location for routing client to closest gateway @@ -136,6 +152,8 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod // Collect node metrics for update to gateway agent service, this information is used inside gateway // for forwarding from other gateway let mut node_metrics_collector = NodeMetricsCollector::default(); + let mut live_sessions = 0; + let mut max_sessions = 0; loop { if controller.process().is_none() { @@ -144,7 +162,22 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod // Pop from metric collector and pass to Gateway store service if let Some(metrics) = node_metrics_collector.pop_measure() { + let node_info = ClusterNodeInfo::Gateway( + ClusterNodeGenericInfo { + addr: node_addr.to_string(), + cpu: metrics.cpu, + memory: metrics.memory, + disk: metrics.disk, + }, + ClusterGatewayInfo { + lat: args.lat, + lon: args.lon, + live: live_sessions, + max: max_sessions, + }, + ); controller.service_control(STORE_SERVICE_ID.into(), (), media_server_gateway::store_service::Control::NodeStats(metrics).into()); + controller.service_control(visualization::SERVICE_ID.into(), (), visualization::Control::UpdateInfo(node_info).into()); } while let Ok(control) = vnet_rx.try_recv() { controller.feature_control((), control.into()); @@ -166,9 +199,13 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod while let Some(out) = controller.pop_event() { match out { - SdnExtOut::ServicesEvent(_, _, SE::Gateway(event)) => { - requester.on_event(event); - } + SdnExtOut::ServicesEvent(_, _, SE::Gateway(event)) => match event { + media_server_gateway::store_service::Event::MediaStats(live, max) => { + live_sessions = live; + max_sessions = max; + } + media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => requester.on_find_node_res(req_id, res), + }, SdnExtOut::FeaturesEvent(_, FeaturesEvent::Socket(event)) => { if let Err(e) = vnet_tx.try_send(event) { log::error!("[MediaEdge] forward Sdn SocketEvent error {:?}", e); diff --git a/bin/src/server/gateway/dest_selector.rs b/bin/src/server/gateway/dest_selector.rs index 5bbd166d..c31208d6 100644 --- a/bin/src/server/gateway/dest_selector.rs +++ b/bin/src/server/gateway/dest_selector.rs @@ -29,14 +29,10 @@ pub struct GatewayDestRequester { } impl GatewayDestRequester { - pub fn on_event(&mut self, event: media_server_gateway::store_service::Event) { - match event { - media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => { - if let Some(tx) = self.reqs.remove(&req_id) { - if tx.send(res).is_err() { - log::error!("[GatewayDestRequester] answer for req_id {req_id} error"); - } - } + pub fn on_find_node_res(&mut self, req_id: u64, res: Option) { + if let Some(tx) = self.reqs.remove(&req_id) { + if tx.send(res).is_err() { + log::error!("[GatewayDestRequester] answer for req_id {req_id} error"); } } } diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index bdc41dca..c682758e 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -7,7 +7,7 @@ use std::{ use atm0s_sdn::{features::FeaturesEvent, SdnExtIn, SdnExtOut}; use clap::Parser; -use media_server_gateway::{ServiceKind, AGENT_SERVICE_ID}; +use media_server_gateway::ServiceKind; use media_server_protocol::{gateway::GATEWAY_RPC_PORT, protobuf::cluster_gateway::MediaEdgeServiceServer, rpc::quinn::QuinnServer}; use media_server_runner::MediaConfig; use media_server_secure::jwt::{MediaEdgeSecureJwt, MediaGatewaySecureJwt}; @@ -145,11 +145,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node if let Some(metrics) = node_metrics_collector.pop_measure() { controller.send_to( 0, //because sdn controller allway is run inside worker 0 - ExtIn::Sdn(SdnExtIn::ServicesControl( - AGENT_SERVICE_ID.into(), - media_server_runner::UserData::Cluster, - media_server_gateway::agent_service::Control::NodeStats(metrics).into(), - )), + ExtIn::NodeStats(metrics).into(), ); } while let Ok(control) = vnet_rx.try_recv() { diff --git a/bin/src/server/media/runtime_worker.rs b/bin/src/server/media/runtime_worker.rs index aa668363..21797b84 100644 --- a/bin/src/server/media/runtime_worker.rs +++ b/bin/src/server/media/runtime_worker.rs @@ -2,6 +2,7 @@ use std::{collections::VecDeque, time::Instant}; use atm0s_sdn::{SdnExtIn, SdnExtOut, SdnWorkerBusEvent}; +use media_server_gateway::NodeMetrics; use media_server_protocol::transport::{RpcReq, RpcRes}; use media_server_runner::{Input as WorkerInput, MediaConfig, MediaServerWorker, Output as WorkerOutput, Owner, UserData, SC, SE, TC, TW}; use media_server_secure::MediaEdgeSecure; @@ -14,6 +15,7 @@ use crate::NodeConfig; pub enum ExtIn { Sdn(SdnExtIn), Rpc(u64, RpcReq), + NodeStats(NodeMetrics), } #[derive(Debug, Clone)] @@ -125,6 +127,7 @@ impl MediaRuntimeWorker { Input::Ext(ext) => match ext { ExtIn::Rpc(req_id, ext) => WorkerInput::ExtRpc(req_id, ext), ExtIn::Sdn(ext) => WorkerInput::ExtSdn(ext), + ExtIn::NodeStats(metrics) => WorkerInput::NodeStats(metrics), }, Input::Net(owner, event) => WorkerInput::Net(owner, event), } diff --git a/packages/media_gateway/src/store.rs b/packages/media_gateway/src/store.rs index a0da408e..e6b3031d 100644 --- a/packages/media_gateway/src/store.rs +++ b/packages/media_gateway/src/store.rs @@ -92,6 +92,10 @@ impl GatewayStore { node } + pub fn local_stats(&self) -> Option { + self.webrtc.local_stats() + } + pub fn pop_output(&mut self) -> Option { self.output.take() } diff --git a/packages/media_gateway/src/store_service.rs b/packages/media_gateway/src/store_service.rs index fbbf4aa5..854c736e 100644 --- a/packages/media_gateway/src/store_service.rs +++ b/packages/media_gateway/src/store_service.rs @@ -23,10 +23,12 @@ use crate::{ pub enum Control { NodeStats(NodeMetrics), FindNodeReq(u64, ServiceKind, Option), + GetMediaStats, } #[derive(Debug, Clone)] pub enum Event { + MediaStats(u32, u32), FindNodeRes(u64, Option), } @@ -132,6 +134,11 @@ where log::debug!("[GatewayStoreService] node metrics {:?}", metrics); self.store.on_node_metrics(now, metrics); } + Control::GetMediaStats => { + if let Some(stats) = self.store.local_stats() { + self.queue.push_back(ServiceOutput::Event(actor, Event::MediaStats(stats.live, stats.max).into())); + } + } } } } diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index 110e3994..2287dc3d 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -4,11 +4,12 @@ use atm0s_sdn::{ generate_node_addr, secure::{HandshakeBuilderXDA, StaticKeyAuthorization}, services::{manual_discovery, visualization}, - ControllerPlaneCfg, DataPlaneCfg, DataWorkerHistory, NetInput, NetOutput, SdnExtIn, SdnExtOut, SdnWorker, SdnWorkerBusEvent, SdnWorkerCfg, SdnWorkerInput, SdnWorkerOutput, TimePivot, + ControllerPlaneCfg, DataPlaneCfg, DataWorkerHistory, NetInput, NetOutput, NodeAddr, SdnExtIn, SdnExtOut, SdnWorker, SdnWorkerBusEvent, SdnWorkerCfg, SdnWorkerInput, SdnWorkerOutput, TimePivot, }; use media_server_core::cluster::{self, MediaCluster}; -use media_server_gateway::{agent_service::GatewayAgentServiceBuilder, ServiceKind, AGENT_SERVICE_ID}; +use media_server_gateway::{agent_service::GatewayAgentServiceBuilder, NodeMetrics, ServiceKind, AGENT_SERVICE_ID}; use media_server_protocol::{ + cluster::{ClusterMediaInfo, ClusterNodeGenericInfo, ClusterNodeInfo}, gateway::generate_gateway_zone_tag, protobuf::gateway::{ConnectResponse, RemoteIceResponse}, transport::{ @@ -52,19 +53,20 @@ pub enum UserData { } #[derive(Clone, Debug, convert_enum::From, convert_enum::TryInto)] pub enum SC { - Visual(visualization::Control), + Visual(visualization::Control), Gateway(media_server_gateway::agent_service::Control), } #[derive(Clone, Debug, convert_enum::From, convert_enum::TryInto)] pub enum SE { - Visual(visualization::Event), + Visual(visualization::Event), Gateway(media_server_gateway::agent_service::Event), } pub type TC = (); pub type TW = (); pub enum Input { + NodeStats(NodeMetrics), ExtRpc(u64, RpcReq), ExtSdn(SdnExtIn), Net(Owner, BackendIncoming), @@ -95,10 +97,12 @@ enum MediaClusterEndpoint { #[allow(clippy::type_complexity)] pub struct MediaServerWorker { worker: u16, + sdn_addr: NodeAddr, sdn_slot: usize, sdn_worker: TaskSwitcherBranch, SdnWorkerOutput>, media_cluster: TaskSwitcherBranch, cluster::Output>, media_webrtc: TaskSwitcherBranch, transport_webrtc::GroupOutput>, + media_max_live: u32, switcher: TaskSwitcher, queue: DynamicDeque, timer: TimePivot, @@ -112,10 +116,27 @@ impl MediaServerWorker { let secure = media.secure.clone(); //TODO why need this? let sdn_udp_addr = SocketAddr::from(([0, 0, 0, 0], sdn_udp)); + let mut media_max_live = 0; + for (_, max) in media.max_live.iter() { + media_max_live += *max; + } let node_addr = generate_node_addr(node_id, sdn_udp, sdn_custom_addrs); + let node_info = ClusterNodeInfo::Media( + ClusterNodeGenericInfo { + addr: node_addr.to_string(), + cpu: 0, + memory: 0, + disk: 0, + }, + ClusterMediaInfo { live: 0, max: media_max_live }, + ); - let visualization = Arc::new(visualization::VisualizationServiceBuilder::new(false)); - let discovery = Arc::new(manual_discovery::ManualDiscoveryServiceBuilder::new(node_addr, vec![], vec![generate_gateway_zone_tag(sdn_zone)])); + let visualization = Arc::new(visualization::VisualizationServiceBuilder::new(node_info, false)); + let discovery = Arc::new(manual_discovery::ManualDiscoveryServiceBuilder::new( + node_addr.clone(), + vec![], + vec![generate_gateway_zone_tag(sdn_zone)], + )); let gateway = Arc::new(GatewayAgentServiceBuilder::new(media.max_live)); let sdn_config = SdnConfig { @@ -141,10 +162,12 @@ impl MediaServerWorker { Self { worker, + sdn_addr: node_addr, sdn_slot: 1, //TODO dont use this hack, must to wait to bind success to network sdn_worker: TaskSwitcherBranch::new(SdnWorker::new(sdn_config), TaskType::Sdn), media_cluster: TaskSwitcherBranch::default(TaskType::MediaCluster), media_webrtc: TaskSwitcherBranch::new(MediaWorkerWebrtc::new(media.webrtc_addrs, media.ice_lite, media.secure), TaskType::MediaWebrtc), + media_max_live, switcher: TaskSwitcher::new(3), queue: DynamicDeque::from([Output::Net(Owner::Sdn, BackendOutgoing::UdpListen { addr: sdn_udp_addr, reuse: true })]), timer: TimePivot::build(), @@ -181,6 +204,38 @@ impl MediaServerWorker { pub fn on_event(&mut self, now: Instant, input: Input) { match input { + Input::NodeStats(metrics) => { + let now_ms = self.timer.timestamp_ms(now); + // we send info to visualization for console UI + self.sdn_worker.input(&mut self.switcher).on_event( + now_ms, + SdnWorkerInput::Ext(SdnExtIn::ServicesControl( + visualization::SERVICE_ID.into(), + UserData::Cluster, + visualization::Control::UpdateInfo(ClusterNodeInfo::Media( + ClusterNodeGenericInfo { + addr: self.sdn_addr.to_string(), + cpu: metrics.cpu, + memory: metrics.memory, + disk: metrics.disk, + }, + ClusterMediaInfo { + live: self.media_webrtc.tasks() as u32, + max: self.media_max_live, + }, + )) + .into(), + )), + ); + self.sdn_worker.input(&mut self.switcher).on_event( + now_ms, + SdnWorkerInput::Ext(SdnExtIn::ServicesControl( + AGENT_SERVICE_ID.into(), + UserData::Cluster, + media_server_gateway::agent_service::Control::NodeStats(metrics).into(), + )), + ); + } Input::ExtRpc(req_id, req) => self.process_rpc(now, req_id, req), Input::ExtSdn(ext) => { let now_ms = self.timer.timestamp_ms(now); diff --git a/packages/media_secure/src/jwt.rs b/packages/media_secure/src/jwt.rs index 163e0eb2..4f0f674a 100644 --- a/packages/media_secure/src/jwt.rs +++ b/packages/media_secure/src/jwt.rs @@ -1,8 +1,9 @@ -use crate::{MediaEdgeSecure, MediaGatewaySecure}; +use crate::{MediaConsoleSecure, MediaEdgeSecure, MediaGatewaySecure}; use jwt_simple::prelude::*; use serde::{de::DeserializeOwned, Serialize}; const CONN_ID_TYPE: &'static str = "conn"; +const CONSOLE_SESSION_TYPE: &'static str = "console_session"; pub struct MediaEdgeSecureJwt { key: HS256Key, @@ -91,6 +92,40 @@ impl MediaGatewaySecure for MediaGatewaySecureJwt { } } +#[derive(Clone)] +pub struct MediaConsoleSecureJwt { + key_str: String, + key: HS256Key, +} + +impl From<&[u8]> for MediaConsoleSecureJwt { + fn from(key: &[u8]) -> Self { + Self { + key: HS256Key::from_bytes(key), + key_str: String::from_utf8_lossy(key).to_string(), + } + } +} + +impl MediaConsoleSecure for MediaConsoleSecureJwt { + fn validate_secert(&self, secret: &str) -> bool { + self.key_str.eq(secret) + } + + fn validate_token(&self, token: &str) -> bool { + let options = VerificationOptions { + allowed_issuers: Some(HashSet::from_strings(&[CONSOLE_SESSION_TYPE])), + ..Default::default() + }; + self.key.verify_token::<()>(token, Some(options)).is_ok() + } + + fn generate_token(&self) -> String { + let claims = Claims::with_custom_claims((), Duration::from_secs(10000)).with_issuer(CONSOLE_SESSION_TYPE); + self.key.authenticate(claims).expect("Should create jwt") + } +} + #[cfg(test)] mod tests { use std::{thread::sleep, time::Duration}; diff --git a/packages/media_secure/src/lib.rs b/packages/media_secure/src/lib.rs index 112ef1e5..00e273f5 100644 --- a/packages/media_secure/src/lib.rs +++ b/packages/media_secure/src/lib.rs @@ -16,3 +16,10 @@ pub trait MediaGatewaySecure { fn encode_obj(&self, _type: &'static str, ob: O, ttl_seconds: u64) -> String; fn decode_conn_id(&self, data: &str) -> Option; } + +/// This interface for console validate session +pub trait MediaConsoleSecure { + fn validate_secert(&self, secret: &str) -> bool; + fn validate_token(&self, token: &str) -> bool; + fn generate_token(&self) -> String; +} diff --git a/packages/protocol/build.rs b/packages/protocol/build.rs index abc9d2eb..8c2e5e07 100644 --- a/packages/protocol/build.rs +++ b/packages/protocol/build.rs @@ -15,11 +15,11 @@ fn main() -> Result<()> { .compile_protos( &[ "./proto/shared.proto", - "./proto/session.proto", - "./proto/features.proto", - "./proto/features.mixer.proto", - "./proto/gateway.proto", - "./proto/cluster_gateway.proto", + "./proto/sdk/session.proto", + "./proto/sdk/features.proto", + "./proto/sdk/features.mixer.proto", + "./proto/sdk/gateway.proto", + "./proto/cluster/gateway.proto", ], &["./proto"], )?; diff --git a/packages/protocol/proto/cluster_gateway.proto b/packages/protocol/proto/cluster/gateway.proto similarity index 99% rename from packages/protocol/proto/cluster_gateway.proto rename to packages/protocol/proto/cluster/gateway.proto index 7fe0b422..dcd1ca70 100644 --- a/packages/protocol/proto/cluster_gateway.proto +++ b/packages/protocol/proto/cluster/gateway.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -import "gateway.proto"; +import "sdk/gateway.proto"; package cluster_gateway; diff --git a/packages/protocol/proto/features.mixer.proto b/packages/protocol/proto/sdk/features.mixer.proto similarity index 100% rename from packages/protocol/proto/features.mixer.proto rename to packages/protocol/proto/sdk/features.mixer.proto diff --git a/packages/protocol/proto/features.proto b/packages/protocol/proto/sdk/features.proto similarity index 91% rename from packages/protocol/proto/features.proto rename to packages/protocol/proto/sdk/features.proto index 73b03463..701fd30e 100644 --- a/packages/protocol/proto/features.proto +++ b/packages/protocol/proto/sdk/features.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -import "features.mixer.proto"; +import "sdk/features.mixer.proto"; package features; diff --git a/packages/protocol/proto/gateway.proto b/packages/protocol/proto/sdk/gateway.proto similarity index 93% rename from packages/protocol/proto/gateway.proto rename to packages/protocol/proto/sdk/gateway.proto index 1f01f109..4d21aa63 100644 --- a/packages/protocol/proto/gateway.proto +++ b/packages/protocol/proto/sdk/gateway.proto @@ -1,7 +1,7 @@ syntax = "proto3"; import "shared.proto"; -import "session.proto"; +import "sdk/session.proto"; package gateway; diff --git a/packages/protocol/proto/session.proto b/packages/protocol/proto/sdk/session.proto similarity index 99% rename from packages/protocol/proto/session.proto rename to packages/protocol/proto/sdk/session.proto index d99e57c3..79ef2942 100644 --- a/packages/protocol/proto/session.proto +++ b/packages/protocol/proto/sdk/session.proto @@ -1,7 +1,7 @@ syntax = "proto3"; import "shared.proto"; -import "features.proto"; +import "sdk/features.proto"; package session; diff --git a/packages/protocol/src/cluster.rs b/packages/protocol/src/cluster.rs new file mode 100644 index 00000000..48d762ec --- /dev/null +++ b/packages/protocol/src/cluster.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterNodeGenericInfo { + pub addr: String, + pub cpu: u8, + pub memory: u8, + pub disk: u8, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterGatewayInfo { + pub live: u32, + pub max: u32, + pub lat: f32, + pub lon: f32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterMediaInfo { + pub live: u32, + pub max: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ClusterNodeInfo { + Console(ClusterNodeGenericInfo), + Gateway(ClusterNodeGenericInfo, ClusterGatewayInfo), + Media(ClusterNodeGenericInfo, ClusterMediaInfo), + Connector(ClusterNodeGenericInfo), +} diff --git a/packages/protocol/src/lib.rs b/packages/protocol/src/lib.rs index c8dde912..073367b5 100644 --- a/packages/protocol/src/lib.rs +++ b/packages/protocol/src/lib.rs @@ -1,3 +1,4 @@ +pub mod cluster; pub mod endpoint; pub mod gateway; pub mod media; diff --git a/packages/protocol/src/protobuf/mixer.rs b/packages/protocol/src/protobuf/mixer.rs deleted file mode 100644 index 6fe2ed72..00000000 --- a/packages/protocol/src/protobuf/mixer.rs +++ /dev/null @@ -1,136 +0,0 @@ -// This file is @generated by prost-build. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Config { - #[prost(enumeration = "Mode", tag = "1")] - pub mode: i32, - #[prost(string, repeated, tag = "2")] - pub outputs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(message, repeated, tag = "3")] - pub sources: ::prost::alloc::vec::Vec, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Request { - #[prost(oneof = "request::Request", tags = "1, 2")] - pub request: ::core::option::Option, -} -/// Nested message and enum types in `Request`. -pub mod request { - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Attach { - #[prost(message, repeated, tag = "1")] - pub sources: ::prost::alloc::vec::Vec, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Detach { - #[prost(message, repeated, tag = "1")] - pub sources: ::prost::alloc::vec::Vec, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Request { - #[prost(message, tag = "1")] - Attach(Attach), - #[prost(message, tag = "2")] - Detach(Detach), - } -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Response { - #[prost(oneof = "response::Response", tags = "1, 2")] - pub response: ::core::option::Option, -} -/// Nested message and enum types in `Response`. -pub mod response { - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Attach {} - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Detach {} - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Response { - #[prost(message, tag = "1")] - Attach(Attach), - #[prost(message, tag = "2")] - Detach(Detach), - } -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ServerEvent { - #[prost(oneof = "server_event::Event", tags = "1, 2, 3")] - pub event: ::core::option::Option, -} -/// Nested message and enum types in `ServerEvent`. -pub mod server_event { - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct MappingSlotSet { - #[prost(uint32, tag = "1")] - pub slot: u32, - #[prost(message, optional, tag = "2")] - pub source: ::core::option::Option, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct MappingSlotDel { - #[prost(uint32, tag = "1")] - pub slot: u32, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct SlotAudioLevel { - #[prost(uint32, tag = "1")] - pub slot: u32, - #[prost(int32, tag = "2")] - pub audio_level: i32, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct MappingSlotsAudioLevel { - #[prost(message, repeated, tag = "1")] - pub slots: ::prost::alloc::vec::Vec, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Event { - #[prost(message, tag = "1")] - SlotSet(MappingSlotSet), - #[prost(message, tag = "2")] - SlotDel(MappingSlotDel), - #[prost(message, tag = "3")] - SlotsAudioLevel(MappingSlotsAudioLevel), - } -} -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum Mode { - Auto = 0, - Manual = 1, -} -impl Mode { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - Mode::Auto => "AUTO", - Mode::Manual => "MANUAL", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "AUTO" => Some(Self::Auto), - "MANUAL" => Some(Self::Manual), - _ => None, - } - } -}