Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatic SDN config with node-api and local_ip #455

Merged
merged 16 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ members = [

[workspace.dependencies]
sans-io-runtime = { version = "0.3", default-features = false }
atm0s-sdn = { version = "0.2", default-features = false }
atm0s-sdn-network = { version = "0.6", default-features = false }
atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false }
atm0s-sdn-network = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false }
tokio = "1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
convert-enum = "0.1"
Expand Down
2 changes: 2 additions & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ media-server-multi-tenancy = { path = "../packages/multi_tenancy", optional = tr
rtpengine-ngcontrol = { path = "../packages/rtpengine_ngcontrol", optional = true }
local-ip-address = "0.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
quinn = { version = "0.11", optional = true }
rustls = { version = "0.23", optional = true }
convert-enum = { workspace = true }
Expand All @@ -38,6 +39,7 @@ maxminddb = { version = "0.24", optional = true }
sysinfo = { version = "0.31", optional = true }
hex = { version = "0.4", optional = true }
mime_guess = { version = "2.0", optional = true }
reqwest = { version = "0.12", features = ["json"]}
sentry = "0.34"

[features]
Expand Down
78 changes: 70 additions & 8 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::sync::Arc;

pub use api_node::NodeApiCtx;
use media_server_protocol::endpoint::ClusterConnId;
#[cfg(feature = "console")]
use media_server_protocol::protobuf::cluster_connector::MediaConnectorServiceClient;
Expand All @@ -14,13 +15,15 @@ use poem::{listener::TcpListener, middleware::Cors, EndpointExt, Route, Server};
use poem_openapi::types::{ToJSON, Type};
use poem_openapi::OpenApiService;
use poem_openapi::{types::ParseFromJSON, Object};
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
#[cfg(feature = "embed_static")]
use utils::EmbeddedFilesEndpoint;

mod api_console;
mod api_media;
mod api_metrics;
mod api_node;
mod api_token;
mod utils;

Expand All @@ -34,13 +37,13 @@ pub struct PublicMediaFiles;
#[folder = "public/console"]
pub struct PublicConsoleFiles;

#[derive(Debug, Default, Object)]
#[derive(Debug, Default, Object, Deserialize)]
pub struct Pagination {
pub total: usize,
pub current: usize,
}

#[derive(Debug, Object)]
#[derive(Debug, Object, Deserialize)]
pub struct Response<T: ParseFromJSON + ToJSON + Type + Send + Sync> {
pub status: bool,
#[oai(skip_serializing_if = "Option::is_none")]
Expand All @@ -65,25 +68,31 @@ impl<T: ParseFromJSON + ToJSON + Type + Send + Sync> Default for Response<T> {
#[cfg(feature = "console")]
pub async fn run_console_http_server(
port: u16,
node: NodeApiCtx,
secure: media_server_secure::jwt::MediaConsoleSecureJwt,
storage: crate::server::console_storage::StorageShared,
connector: MediaConnectorServiceClient<SocketAddr, QuinnClient, QuinnStream>,
) -> Result<(), Box<dyn std::error::Error>> {
use poem::middleware::Tracing;

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_ui = user_service.swagger_ui();
let user_spec = user_service.spec();

let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Console Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/");
let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/");
let cluster_ui = cluster_service.swagger_ui();
let cluster_spec = cluster_service.spec();

let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Console Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/");
let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/");
let connector_ui = connector_service.swagger_ui();
let connector_spec = connector_service.spec();

Expand All @@ -96,6 +105,10 @@ pub async fn run_console_http_server(

let route = Route::new()
.nest("/", console_panel)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
Expand All @@ -122,6 +135,7 @@ pub async fn run_console_http_server(
#[cfg(feature = "gateway")]
pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync, GS: 'static + MediaGatewaySecure + Send + Sync>(
port: u16,
node: NodeApiCtx,
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Arc<GS>,
Expand All @@ -130,7 +144,12 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
let token_ui = token_service.swagger_ui();
let token_spec = token_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

Expand Down Expand Up @@ -177,6 +196,10 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync

let route = Route::new()
.nest("/samples", samples)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//token
.nest("/token/", token_service.data(api_token::TokenServerCtx { secure: gateway_secure }))
.nest("/token/ui", token_ui)
Expand Down Expand Up @@ -207,16 +230,51 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
Ok(())
}

#[cfg(feature = "connector")]
pub async fn run_connector_http_server(port: u16, node: NodeApiCtx) -> Result<(), Box<dyn std::error::Error>> {
use poem::middleware::Tracing;

let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

let route = Route::new()
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
.at("/api/metrics/spec", poem::endpoint::make_sync(move |_| metrics_spec.clone()))
.with(Cors::new())
.with(Tracing);

Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?;
Ok(())
}

#[cfg(feature = "media")]
pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync, GS: 'static + MediaGatewaySecure + Send + Sync>(
port: u16,
node: NodeApiCtx,
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Option<Arc<GS>>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut route = Route::new();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

Expand Down Expand Up @@ -273,6 +331,10 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,

let route = route
.nest("/samples", samples)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
Expand Down
35 changes: 34 additions & 1 deletion bin/src/http/api_console/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,45 @@ use crate::server::console_storage::{ConsoleNode, Zone, ZoneDetails};
use super::{super::Response, ConsoleApisCtx, ConsoleAuthorization};
use media_server_protocol::cluster::ZoneId;
use poem::web::Data;
use poem_openapi::{param::Path, payload::Json, OpenApi};
use poem_openapi::{
param::{Path, Query},
payload::Json,
Enum, OpenApi,
};

pub struct Apis;

#[derive(Debug, Clone, Enum)]
enum NodeType {
Console,
Gateway,
Connector,
Media,
}

#[OpenApi]
impl Apis {
/// Get seed nodes for a zone and node type
/// With console node type, it will return all consoles.
/// With gateway node type, it will return all consoles and gateways in the same zone.
/// With connector node type, it will return all gateways in the zone.
/// With media node type, it will return all gateways in the zone.
#[oai(path = "/seeds", method = "get")]
async fn seeds_for(&self, zone_id: Query<u32>, node_type: Query<NodeType>, Data(ctx): Data<&ConsoleApisCtx>) -> Json<Vec<String>> {
log::info!("seeds_for zone_id: {}, node_type: {:?}", zone_id.0, node_type.0);
match node_type.0 {
NodeType::Console => Json(ctx.storage.consoles().iter().map(|node| node.addr.clone()).collect()),
NodeType::Gateway => {
let consoles = ctx.storage.consoles().into_iter().map(|node| node.addr.clone());
let zone = ctx.storage.zone(ZoneId(zone_id.0));
let same_zone_gateways = zone.iter().flat_map(|n| n.gateways.iter()).map(|n| n.addr.clone());
Json(consoles.chain(same_zone_gateways).collect())
}
NodeType::Connector => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()),
NodeType::Media => Json(ctx.storage.zone(ZoneId(zone_id.0)).unwrap().gateways.iter().map(|node| node.addr.to_string()).collect()),
}
}

/// get consoles from all zones
#[oai(path = "/consoles", method = "get")]
async fn consoles(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>) -> Json<Response<Vec<ConsoleNode>>> {
Expand Down
Loading
Loading