Skip to content

Commit

Permalink
feat: SDN discovery over node http-api
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Nov 18, 2024
1 parent 887dcde commit 61ef5db
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ maxminddb = { version = "0.24", optional = true }
sysinfo = { version = "0.31", optional = true }
hex = { version = "0.4", optional = true }
mime_guess = { version = "2.0", optional = true }
reqwest = { version = "0.12", features = ["json"]}
sentry = "0.34"

[features]
Expand Down
49 changes: 41 additions & 8 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::sync::Arc;

pub use api_node::NodeApiCtx;
use media_server_protocol::endpoint::ClusterConnId;
#[cfg(feature = "console")]
use media_server_protocol::protobuf::cluster_connector::MediaConnectorServiceClient;
Expand All @@ -14,13 +15,15 @@ use poem::{listener::TcpListener, middleware::Cors, EndpointExt, Route, Server};
use poem_openapi::types::{ToJSON, Type};
use poem_openapi::OpenApiService;
use poem_openapi::{types::ParseFromJSON, Object};
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
#[cfg(feature = "embed_static")]
use utils::EmbeddedFilesEndpoint;

mod api_console;
mod api_media;
mod api_metrics;
mod api_node;
mod api_token;
mod utils;

Expand All @@ -34,13 +37,13 @@ pub struct PublicMediaFiles;
#[folder = "public/console"]
pub struct PublicConsoleFiles;

#[derive(Debug, Default, Object)]
#[derive(Debug, Default, Object, Deserialize)]
pub struct Pagination {
pub total: usize,
pub current: usize,
}

#[derive(Debug, Object)]
#[derive(Debug, Object, Deserialize)]
pub struct Response<T: ParseFromJSON + ToJSON + Type + Send + Sync> {
pub status: bool,
#[oai(skip_serializing_if = "Option::is_none")]
Expand All @@ -65,25 +68,31 @@ impl<T: ParseFromJSON + ToJSON + Type + Send + Sync> Default for Response<T> {
#[cfg(feature = "console")]
pub async fn run_console_http_server(
port: u16,
node: NodeApiCtx,
secure: media_server_secure::jwt::MediaConsoleSecureJwt,
storage: crate::server::console_storage::StorageShared,
connector: MediaConnectorServiceClient<SocketAddr, QuinnClient, QuinnStream>,
) -> Result<(), Box<dyn std::error::Error>> {
use poem::middleware::Tracing;

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_ui = user_service.swagger_ui();
let user_spec = user_service.spec();

let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Console Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/");
let cluster_service: OpenApiService<_, ()> = OpenApiService::new(api_console::cluster::Apis, "Cluster APIs", env!("CARGO_PKG_VERSION")).server("/api/cluster/");
let cluster_ui = cluster_service.swagger_ui();
let cluster_spec = cluster_service.spec();

let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Console Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/");
let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/");
let connector_ui = connector_service.swagger_ui();
let connector_spec = connector_service.spec();

Expand All @@ -96,6 +105,10 @@ pub async fn run_console_http_server(

let route = Route::new()
.nest("/", console_panel)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
Expand All @@ -122,6 +135,7 @@ pub async fn run_console_http_server(
#[cfg(feature = "gateway")]
pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync, GS: 'static + MediaGatewaySecure + Send + Sync>(
port: u16,
node: NodeApiCtx,
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Arc<GS>,
Expand All @@ -130,7 +144,12 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
let token_ui = token_service.swagger_ui();
let token_spec = token_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

Expand Down Expand Up @@ -177,6 +196,10 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync

let route = Route::new()
.nest("/samples", samples)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//token
.nest("/token/", token_service.data(api_token::TokenServerCtx { secure: gateway_secure }))
.nest("/token/ui", token_ui)
Expand Down Expand Up @@ -210,13 +233,19 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
#[cfg(feature = "media")]
pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync, GS: 'static + MediaGatewaySecure + Send + Sync>(
port: u16,
node: NodeApiCtx,
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Option<Arc<GS>>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut route = Route::new();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let node_api = api_node::Apis::new(node);
let node_service = OpenApiService::new(node_api, "Node APIs", env!("CARGO_PKG_VERSION")).server("/api/node/");
let node_ui = node_service.swagger_ui();
let node_spec = node_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

Expand Down Expand Up @@ -273,6 +302,10 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,

let route = route
.nest("/samples", samples)
//node
.nest("/api/node/", node_service)
.nest("/api/node/ui", node_ui)
.at("/api/node/spec", poem::endpoint::make_sync(move |_| node_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
Expand Down
29 changes: 29 additions & 0 deletions bin/src/http/api_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use poem_openapi::{payload::Json, OpenApi};

use super::Response;

pub struct NodeApiCtx {
pub address: String,
}

pub struct Apis {
ctx: NodeApiCtx,
}

impl Apis {
pub fn new(ctx: NodeApiCtx) -> Self {
Self { ctx }
}
}

#[OpenApi]
impl Apis {
#[oai(path = "/address", method = "get")]
async fn get_address(&self) -> Json<Response<String>> {
Json(Response {
status: true,
data: Some(self.ctx.address.clone()),
..Default::default()
})
}
}
13 changes: 12 additions & 1 deletion bin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, str::FromStr};

use atm0s_sdn::{NodeAddr, NodeId};
use media_server_protocol::cluster::ZoneId;
Expand All @@ -22,3 +22,14 @@ pub struct NodeConfig {
pub zone: ZoneId,
pub bind_addrs_alt: Vec<SocketAddr>,
}

pub async fn fetch_node_addr_from_api(url: &str) -> Result<NodeAddr, String> {
let resp = reqwest::get(url).await.map_err(|e| e.to_string())?;
let node_addr = resp
.json::<http::Response<String>>()
.await
.map_err(|e| e.to_string())?
.data
.ok_or(format!("No data in response from {}", url))?;
NodeAddr::from_str(&node_addr).map_err(|e| e.to_string())
}
17 changes: 15 additions & 2 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::net::{IpAddr, SocketAddr};

use atm0s_media_server::{server, NodeConfig};
use atm0s_media_server::{fetch_node_addr_from_api, server, NodeConfig};
use atm0s_sdn::NodeAddr;
use clap::Parser;
use media_server_protocol::cluster::ZoneId;
Expand Down Expand Up @@ -56,6 +56,12 @@ struct Args {
#[arg(env, long)]
seeds: Vec<NodeAddr>,

/// Seeds from API, this is used for auto-discovery of seeds.
/// It is very useful for cloud deployment.
/// Currently all of nodes expose /api/node/address endpoint, so we can get seeds from there.
#[arg(env, long)]
seeds_from_node_api: Option<String>,

/// Number of worker threads to spawn.
#[arg(env, long, default_value_t = 1)]
workers: usize,
Expand Down Expand Up @@ -118,7 +124,7 @@ async fn main() {
.map(|(_name, ip)| SocketAddr::new(ip, sdn_port))
.collect::<Vec<_>>()
};
let node = NodeConfig {
let mut node = NodeConfig {
node_id: ZoneId(args.sdn_zone_id).to_node_id(args.sdn_zone_node_id),
secret: args.secret,
seeds: args.seeds,
Expand All @@ -129,6 +135,13 @@ async fn main() {

log::info!("Bind addrs {:?}, bind addrs alt {:?}", node.bind_addrs, node.bind_addrs_alt);

if let Some(seeds_from_node_api) = args.seeds_from_node_api {
log::info!("Generate seeds from node_api {}", seeds_from_node_api);
let addr = fetch_node_addr_from_api(&seeds_from_node_api).await.expect("should get seed");
log::info!("Generated seed {:?}", addr);
node.seeds = vec![addr];
}

let local = tokio::task::LocalSet::new();
local
.run_until(async move {
Expand Down
5 changes: 3 additions & 2 deletions bin/src/server/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use media_server_secure::jwt::MediaConsoleSecureJwt;
use storage::StorageShared;

use crate::{
http::run_console_http_server,
http::{run_console_http_server, NodeApiCtx},
node_metrics::NodeMetricsCollector,
quinn::{make_quinn_client, VirtualNetwork},
NodeConfig,
Expand Down Expand Up @@ -72,8 +72,9 @@ pub async fn run_console_server(workers: usize, http_port: Option<u16>, node: No
if let Some(http_port) = http_port {
let secure = MediaConsoleSecureJwt::from(node.secret.as_bytes());
let storage = storage.clone();
let node_ctx = NodeApiCtx { address: node_addr.to_string() };
tokio::spawn(async move {
if let Err(e) = run_console_http_server(http_port, secure, storage, connector_rpc_client).await {
if let Err(e) = run_console_http_server(http_port, node_ctx, secure, storage, connector_rpc_client).await {
log::error!("HTTP Error: {}", e);
}
});
Expand Down
28 changes: 15 additions & 13 deletions bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use std::net::SocketAddr;

use crate::{
http::run_gateway_http_server,
http::{run_gateway_http_server, NodeApiCtx},
ng_controller::NgControllerServer,
node_metrics::NodeMetricsCollector,
quinn::{make_quinn_client, make_quinn_server, VirtualNetwork},
Expand Down Expand Up @@ -114,17 +114,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
let gateway_secure = Arc::new(gateway_secure);

let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024);
if let Some(http_port) = http_port {
let req_tx = req_tx.clone();
let secure2 = edge_secure.clone();
tokio::spawn(async move {
if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure).await {
log::error!("HTTP Error: {}", e);
}
});
}

//Running ng controller for Voip
// Running ng controller for Voip
if let Some(ngproto_addr) = args.rtpengine_cmd_addr {
let req_tx = req_tx.clone();
let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await;
Expand All @@ -137,8 +127,8 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
});
}

// Setup Sdn
let node_id = node.node_id;

let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.bind_addrs_alt);
let node_addr = builder.node_addr();
let node_info = ClusterNodeInfo::Gateway(
Expand Down Expand Up @@ -168,6 +158,18 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
let mut controller = builder.build::<PollingBackend<SdnOwner, 128, 128>>(workers, node_info);
let (selector, mut requester) = build_dest_selector();

// Setup HTTP server
if let Some(http_port) = http_port {
let req_tx = req_tx.clone();
let secure2 = edge_secure.clone();
let node_ctx = NodeApiCtx { address: node_addr.to_string() };
tokio::spawn(async move {
if let Err(e) = run_gateway_http_server(http_port, node_ctx, req_tx, secure2, gateway_secure).await {
log::error!("HTTP Error: {}", e);
}
});
}

// Ip location for routing client to closest gateway
let ip2location = Arc::new(Ip2Location::new(&args.geo_db));

Expand Down
8 changes: 5 additions & 3 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
time::{Duration, Instant},
};

use atm0s_sdn::{features::FeaturesEvent, SdnExtIn, SdnExtOut, TimePivot, TimeTicker};
use atm0s_sdn::{features::FeaturesEvent, generate_node_addr, SdnExtIn, SdnExtOut, TimePivot, TimeTicker};
use clap::Parser;
use media_server_gateway::ServiceKind;
use media_server_multi_tenancy::MultiTenancyStorage;
Expand All @@ -27,7 +27,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use sans_io_runtime::{backend::PollingBackend, Controller};

use crate::{
http::run_media_http_server,
http::{run_media_http_server, NodeApiCtx},
ng_controller::NgControllerServer,
node_metrics::NodeMetricsCollector,
quinn::{make_quinn_server, VirtualNetwork},
Expand Down Expand Up @@ -92,15 +92,17 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node

let secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes()));
let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024);
let node_addr = generate_node_addr(node.node_id, &node.bind_addrs, node.bind_addrs_alt.clone());
if let Some(http_port) = http_port {
let secure_gateway = args.enable_token_api.then(|| {
let app_storage = Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None));
Arc::new(MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage))
});
let req_tx = req_tx.clone();
let secure_edge = secure.clone();
let node_ctx = NodeApiCtx { address: node_addr.to_string() };
tokio::spawn(async move {
if let Err(e) = run_media_http_server(http_port, req_tx, secure_edge, secure_gateway).await {
if let Err(e) = run_media_http_server(http_port, node_ctx, req_tx, secure_edge, secure_gateway).await {
log::error!("HTTP Error: {}", e);
}
});
Expand Down
2 changes: 1 addition & 1 deletion bin/z0_connector_n4.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ RUST_BACKTRACE=1 \
cargo run -- \
--sdn-zone-id 0 \
--sdn-zone-node-id 4 \
--seeds 1@/ip4/127.0.0.1/udp/10001 \
--seeds-from-node-api "http://localhost:3000/api/node/address" \
connector \
--s3-uri "http://minioadmin:[email protected]:9000/record"
2 changes: 1 addition & 1 deletion bin/z0_gate_n1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cargo run -- \
--sdn-port 10001 \
--sdn-zone-id 0 \
--sdn-zone-node-id 1 \
--seeds 0@/ip4/127.0.0.1/udp/10000 \
--seeds-from-node-api "http://localhost:8080/api/node/address" \
--workers 2 \
gateway \
--lat 10 \
Expand Down
Loading

0 comments on commit 61ef5db

Please sign in to comment.