Skip to content

Commit

Permalink
refactor: config zone id node id media port, get console lists (#417)
Browse files Browse the repository at this point in the history
* refactor node-id, zone-id and sdn-port media-port for better configuring

* refactor with zone-id, added consoles list api

* update docs for new sdn-zone-id, sdn-zone-idx config

* fix clippy

* change term: sdn-zone-idx to sdn-zone-node-id
  • Loading branch information
giangndm authored Aug 9, 2024
1 parent 418e0e5 commit 597ab58
Show file tree
Hide file tree
Showing 29 changed files with 256 additions and 193 deletions.
9 changes: 0 additions & 9 deletions bin/connector_z0_n1.sh

This file was deleted.

9 changes: 0 additions & 9 deletions bin/connector_z256_n1.sh

This file was deleted.

15 changes: 13 additions & 2 deletions bin/src/http/api_console/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
use crate::server::console_storage::{Zone, ZoneDetails};
use crate::server::console_storage::{ConsoleNode, Zone, ZoneDetails};

use super::{super::Response, ConsoleApisCtx, ConsoleAuthorization};
use media_server_protocol::cluster::ZoneId;
use poem::web::Data;
use poem_openapi::{param::Path, payload::Json, OpenApi};

pub struct Apis;

#[OpenApi]
impl Apis {
/// get consoles from all zones
#[oai(path = "/consoles", method = "get")]
async fn consoles(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>) -> Json<Response<Vec<ConsoleNode>>> {
Json(Response {
status: true,
data: Some(ctx.storage.consoles()),
..Default::default()
})
}

/// get zones
#[oai(path = "/zones", method = "get")]
async fn zones(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>) -> Json<Response<Vec<Zone>>> {
Expand All @@ -21,7 +32,7 @@ impl Apis {
/// get zone
#[oai(path = "/zones/:zone_id", method = "get")]
async fn zone(&self, _auth: ConsoleAuthorization, zone_id: Path<u32>, Data(ctx): Data<&ConsoleApisCtx>) -> Json<Response<ZoneDetails>> {
if let Some(zone) = ctx.storage.zone(zone_id.0) {
if let Some(zone) = ctx.storage.zone(ZoneId(zone_id.0)) {
Json(Response {
status: true,
data: Some(zone),
Expand Down
3 changes: 2 additions & 1 deletion bin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;

use atm0s_sdn::{NodeAddr, NodeId};
use media_server_protocol::cluster::ZoneId;

mod errors;
mod http;
Expand All @@ -18,6 +19,6 @@ pub struct NodeConfig {
pub secret: String,
pub seeds: Vec<NodeAddr>,
pub bind_addrs: Vec<SocketAddr>,
pub zone: u32,
pub zone: ZoneId,
pub bind_addrs_alt: Vec<SocketAddr>,
}
62 changes: 38 additions & 24 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,70 @@
use std::net::{IpAddr, SocketAddr};

use atm0s_media_server::{server, NodeConfig};
use atm0s_sdn::{NodeAddr, NodeId};
use atm0s_sdn::NodeAddr;
use clap::Parser;
use media_server_protocol::cluster::ZoneId;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

const MAX_ZONE_ID: u32 = 1u32 << 24;

/// Scalable Media Server solution for WebRTC, RTMP, and SIP.
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// Http port
/// HTTP port for incoming requests.
#[arg(env, long)]
http_port: Option<u16>,

/// Run http with tls or not
/// Enable TLS for HTTP connections (set to the port number for HTTPS).
#[arg(env, long)]
http_tls: Option<u16>,

/// Sdn port
/// SDN (Software-Defined Networking) port.
#[arg(env, long, default_value_t = 0)]
sdn_port: u16,

/// Sdn Zone, which is 32bit number with last 8bit is 0
/// SDN zone identifier, a 24-bit number representing the zone ID.
#[arg(env, long, default_value_t = 0)]
sdn_zone: u32,
sdn_zone_id: u32,

/// Current Node ID
#[arg(env, long, default_value_t = 1)]
node_id: NodeId,
/// The 8-bit index of the current node within the SDN zone.
#[arg(env, long, default_value_t = 0)]
sdn_zone_node_id: u8,

/// Setting the single node IP option will disable the autodetect IP addresses logic
/// Manually specify the IP address of the node. This disables IP autodetection.
#[arg(env, long)]
node_ip: Option<IpAddr>,

/// Some alternative node IPs, which are useful with some cloud providers behind NAT, like AWS or GCP ...
/// Alternative IP addresses for the node, useful for environments like AWS or GCP that are behind NAT.
#[arg(env, long)]
node_ip_alt: Vec<IpAddr>,

/// Enable private ip
/// Enable private IP addresses for the node.
#[arg(env, long)]
enable_private_ip: bool,

/// Enable ipv6
/// Enable IPv6 support.
#[arg(env, long)]
enable_ipv6: bool,

/// Cluster Secret Key
/// Cluster secret key used for secure communication between nodes.
#[arg(env, long, default_value = "insecure")]
secret: String,

/// Neighbors
/// Addresses of neighboring nodes for cluster communication.
#[arg(env, long)]
seeds: Vec<NodeAddr>,

/// Workers
/// Number of worker threads to spawn.
#[arg(env, long, default_value_t = 1)]
workers: usize,

/// Enable sentry report
/// Disable Sentry error reporting.
#[arg(env, long)]
sentry_disable: bool,

/// Sentry report endpoint
/// Sentry error reporting endpoint.
#[arg(env, long, default_value = "https://46f5e9a11d430eb479b516fc12033e78@o4507218956386304.ingest.us.sentry.io/4507739106836480")]
sentry_endpoint: String,

Expand All @@ -80,6 +83,8 @@ async fn main() {
let args: Args = Args::parse();
tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init();

assert!(args.sdn_zone_id < MAX_ZONE_ID, "sdn_zone_id must < {MAX_ZONE_ID}");

if !args.sentry_disable {
let _guard = sentry::init((
args.sentry_endpoint.as_str(),
Expand All @@ -91,9 +96,18 @@ async fn main() {
}

let http_port = args.http_port;
let sdn_port = if args.sdn_port > 0 {
args.sdn_port
} else {
// We get a free port
let udp_socket = std::net::UdpSocket::bind("0.0.0.0:0").expect("Should get free port");
udp_socket.local_addr().expect("Should get free port").port()
};

let workers = args.workers;

let bind_addrs = if let Some(ip) = args.node_ip {
vec![SocketAddr::new(ip, args.sdn_port)]
vec![SocketAddr::new(ip, sdn_port)]
} else {
local_ip_address::list_afinet_netifas()
.expect("Should have list interfaces")
Expand All @@ -103,18 +117,18 @@ async fn main() {
IpAddr::V4(ipv4) => !ipv4.is_private() || args.enable_private_ip,
IpAddr::V6(ipv6) => !ipv6.is_unspecified() && !ipv6.is_multicast() && (!ipv6.is_loopback() || args.enable_private_ip) && args.enable_ipv6,
};
allow && std::net::UdpSocket::bind(SocketAddr::new(*ip, 0)).is_ok()
allow && std::net::UdpSocket::bind(SocketAddr::new(*ip, sdn_port)).is_ok()
})
.map(|(_name, ip)| SocketAddr::new(ip, args.sdn_port))
.map(|(_name, ip)| SocketAddr::new(ip, sdn_port))
.collect::<Vec<_>>()
};
let node = NodeConfig {
node_id: args.node_id,
node_id: ZoneId(args.sdn_zone_id).to_node_id(args.sdn_zone_node_id),
secret: args.secret,
seeds: args.seeds,
bind_addrs,
zone: args.sdn_zone,
bind_addrs_alt: args.node_ip_alt.into_iter().map(|ip| SocketAddr::new(ip, args.sdn_port)).collect::<Vec<_>>(),
zone: ZoneId(args.sdn_zone_id),
bind_addrs_alt: args.node_ip_alt.into_iter().map(|ip| SocketAddr::new(ip, sdn_port)).collect::<Vec<_>>(),
};

log::info!("Bind addrs {:?}, bind addrs alt {:?}", node.bind_addrs, node.bind_addrs_alt);
Expand Down
Loading

0 comments on commit 597ab58

Please sign in to comment.