Skip to content

Commit

Permalink
feat: automatic SDN config with node-api and local_ip (#455)
Browse files Browse the repository at this point in the history
* feat: SDN discovery over node http-api

* added docs

* make it simpler

* feat: generate node-id from local_io

* feat: nat-traversal by detect public_ip from cloud metadata

* fix aws public-ip

* fix ip parse

* setting rtpengine public-ip

* accept both single addr or array addrs with seeds-from-url

* fix: connector only need to connect to same-zone gateway

* fix record with aws-s3

* fix record compose

* fix warns

* added node router_dump
  • Loading branch information
giangndm authored Nov 26, 2024
1 parent a782dfb commit 2f3381d
Show file tree
Hide file tree
Showing 34 changed files with 625 additions and 110 deletions.
28 changes: 16 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ members = [

[workspace.dependencies]
sans-io-runtime = { version = "0.3", default-features = false }
atm0s-sdn = { version = "0.2", default-features = false }
atm0s-sdn-network = { version = "0.6", default-features = false }
atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false }
atm0s-sdn-network = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false }
tokio = "1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
convert-enum = "0.1"
Expand Down
2 changes: 2 additions & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ media-server-multi-tenancy = { path = "../packages/multi_tenancy", optional = tr
rtpengine-ngcontrol = { path = "../packages/rtpengine_ngcontrol", optional = true }
local-ip-address = "0.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
quinn = { version = "0.11", optional = true }
rustls = { version = "0.23", optional = true }
convert-enum = { workspace = true }
Expand All @@ -38,6 +39,7 @@ maxminddb = { version = "0.24", optional = true }
sysinfo = { version = "0.31", optional = true }
hex = { version = "0.4", optional = true }
mime_guess = { version = "2.0", optional = true }
reqwest = { version = "0.12", features = ["json"]}
sentry = "0.34"

[features]
Expand Down
78 changes: 70 additions & 8 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::sync::Arc;

pub use api_node::NodeApiCtx;
use media_server_protocol::endpoint::ClusterConnId;
#[cfg(feature = "console")]
use media_server_protocol::protobuf::cluster_connector::MediaConnectorServiceClient;
Expand All @@ -14,13 +15,15 @@ use poem::{listener::TcpListener, middleware::Cors, EndpointExt, Route, Server};
use poem_openapi::types::{ToJSON, Type};
use poem_openapi::OpenApiService;
use poem_openapi::{types::ParseFromJSON, Object};
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
#[cfg(feature = "embed_static")]
use utils::EmbeddedFilesEndpoint;

mod api_console;
mod api_media;
mod api_metrics;
mod api_node;
mod api_token;
mod utils;

Expand All @@ -34,13 +37,13 @@ pub struct PublicMediaFiles;
#[folder = "public/console"]
pub struct PublicConsoleFiles;

#[derive(Debug, Default, Object)]
#[derive(Debug, Default, Object, Deserialize)]
pub struct Pagination {
pub total: usize,
pub current: usize,
}

#[derive(Debug, Object)]
#[derive(Debug, Object, Deserialize)]
pub struct Response<T: ParseFromJSON + ToJSON + Type + Send + Sync> {
pub status: bool,
#[oai(skip_serializing_if = "Option::is_none")]
Expand All @@ -65,25 +68,31 @@ impl<T: ParseFromJSON + ToJSON + Type + Send + Sync> Default for Response<T> {
#[cfg(feature = "console")]
pub async fn run_console_http_server(
port: u16,
node: NodeApiCtx,
secure: media_server_secure::jwt::MediaConsoleSecureJwt,
storage: crate::server::console_storage::StorageShared,
connector: MediaConnectorServiceClient<SocketAddr, QuinnClient, QuinnStream>,
) -> Result<(), Box<dyn std::error::Error>> {
use poem::middleware::Tracing;

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_ui = user_service.swagger_ui();
let user_spec = user_service.spec();

let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Console Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/");
let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/");
let cluster_ui = cluster_service.swagger_ui();
let cluster_spec = cluster_service.spec();

let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Console Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/");
let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/");
let connector_ui = connector_service.swagger_ui();
let connector_spec = connector_service.spec();

Expand All @@ -96,6 +105,10 @@ pub async fn run_console_http_server(

let route = Route::new()
.nest("/", console_panel)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
Expand All @@ -122,6 +135,7 @@ pub async fn run_console_http_server(
#[cfg(feature = "gateway")]
pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync, GS: 'static + MediaGatewaySecure + Send + Sync>(
port: u16,
node: NodeApiCtx,
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Arc<GS>,
Expand All @@ -130,7 +144,12 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
let token_ui = token_service.swagger_ui();
let token_spec = token_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

Expand Down Expand Up @@ -177,6 +196,10 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync

let route = Route::new()
.nest("/samples", samples)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//token
.nest("/token/", token_service.data(api_token::TokenServerCtx { secure: gateway_secure }))
.nest("/token/ui", token_ui)
Expand Down Expand Up @@ -207,16 +230,51 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
Ok(())
}

#[cfg(feature = "connector")]
pub async fn run_connector_http_server(port: u16, node: NodeApiCtx) -> Result<(), Box<dyn std::error::Error>> {
use poem::middleware::Tracing;

let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

let route = Route::new()
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
.at("/api/metrics/spec", poem::endpoint::make_sync(move |_| metrics_spec.clone()))
.with(Cors::new())
.with(Tracing);

Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?;
Ok(())
}

#[cfg(feature = "media")]
pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync, GS: 'static + MediaGatewaySecure + Send + Sync>(
port: u16,
node: NodeApiCtx,
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Option<Arc<GS>>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut route = Route::new();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

Expand Down Expand Up @@ -273,6 +331,10 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,

let route = route
.nest("/samples", samples)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
Expand Down
35 changes: 34 additions & 1 deletion bin/src/http/api_console/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,45 @@ use crate::server::console_storage::{ConsoleNode, Zone, ZoneDetails};
use super::{super::Response, ConsoleApisCtx, ConsoleAuthorization};
use media_server_protocol::cluster::ZoneId;
use poem::web::Data;
use poem_openapi::{param::Path, payload::Json, OpenApi};
use poem_openapi::{
param::{Path, Query},
payload::Json,
Enum, OpenApi,
};

pub struct Apis;

#[derive(Debug, Clone, Enum)]
enum NodeType {
Console,
Gateway,
Connector,
Media,
}

#[OpenApi]
impl Apis {
/// Get seed nodes for a zone and node type
/// With console node type, it will return all consoles.
/// With gateway node type, it will return all consoles and gateways in the same zone.
/// With connector node type, it will return all gateways in the zone.
/// With media node type, it will return all gateways in the zone.
#[oai(path = "/seeds", method = "get")]
async fn seeds_for(&self, zone_id: Query<u32>, node_type: Query<NodeType>, Data(ctx): Data<&ConsoleApisCtx>) -> Json<Vec<String>> {
log::info!("seeds_for zone_id: {}, node_type: {:?}", zone_id.0, node_type.0);
match node_type.0 {
NodeType::Console => Json(ctx.storage.consoles().iter().map(|node| node.addr.clone()).collect()),
NodeType::Gateway => {
let consoles = ctx.storage.consoles().into_iter().map(|node| node.addr.clone());
let zone = ctx.storage.zone(ZoneId(zone_id.0));
let same_zone_gateways = zone.iter().flat_map(|n| n.gateways.iter()).map(|n| n.addr.clone());
Json(consoles.chain(same_zone_gateways).collect())
}
NodeType::Connector => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()),
NodeType::Media => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()),
}
}

/// get consoles from all zones
#[oai(path = "/consoles", method = "get")]
async fn consoles(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>) -> Json<Response<Vec<ConsoleNode>>> {
Expand Down
Loading

0 comments on commit 2f3381d

Please sign in to comment.