Skip to content

Commit

Permalink
feat: gateway global
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Jan 23, 2024
1 parent 7cf84e4 commit 0c150aa
Show file tree
Hide file tree
Showing 12 changed files with 574 additions and 143 deletions.
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::SocketAddr;

use media_utils::F32;
use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};

Expand All @@ -15,6 +16,8 @@ pub struct ServiceInfo {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, IntoVecU8, TryFromSliceU8)]
pub struct NodePing {
pub node_id: u32,
pub group: String,
pub location: Option<(F32<2>, F32<2>)>,
pub webrtc: Option<ServiceInfo>,
pub rtmp: Option<ServiceInfo>,
pub sip: Option<ServiceInfo>,
Expand Down
2 changes: 1 addition & 1 deletion packages/media-utils/src/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};

/// This store float f32 as u32, and can be used as key in HashMap
/// ACCURACY is the number of digits after the decimal point
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub struct F32<const ACCURACY: usize>(u32);

impl<const ACCURACY: usize> F32<ACCURACY> {
Expand Down
10 changes: 8 additions & 2 deletions servers/media-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod server;
use cluster::{
atm0s_sdn::SystemTimer,
implement::{NodeAddr, NodeId, ServerSdn, ServerSdnConfig},
CONNECTOR_SERVICE, INNER_GATEWAY_SERVICE, MEDIA_SERVER_SERVICE,
CONNECTOR_SERVICE, GLOBAL_GATEWAY_SERVICE, INNER_GATEWAY_SERVICE, MEDIA_SERVER_SERVICE,
};

#[cfg(feature = "connector")]
Expand All @@ -26,6 +26,8 @@ use server::webrtc::run_webrtc_server;

use tracing_subscriber::{fmt, prelude::*, EnvFilter};

use crate::server::gateway::GatewayMode;

/// Media Server
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -99,7 +101,11 @@ async fn main() {
use server::MediaServerContext;
let token = Arc::new(cluster::implement::jwt_static::JwtStaticToken::new(&args.secret));
let ctx = MediaServerContext::<()>::new(args.node_id, 0, Arc::new(SystemTimer()), token.clone(), token);
let (cluster, rpc_endpoint) = ServerSdn::new(args.node_id, args.sdn_port, INNER_GATEWAY_SERVICE, config).await;
let rpc_service_id = match opts.mode {
GatewayMode::Inner => INNER_GATEWAY_SERVICE,
GatewayMode::Global => GLOBAL_GATEWAY_SERVICE,
};
let (cluster, rpc_endpoint) = ServerSdn::new(args.node_id, args.sdn_port, rpc_service_id, config).await;
if let Err(e) = run_gateway_server(args.http_port, args.http_tls, opts, ctx, cluster, rpc_endpoint).await {
log::error!("[GatewayServer] error {}", e);
}
Expand Down
58 changes: 52 additions & 6 deletions servers/media-server/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ use std::{sync::Arc, time::Duration};
use async_std::stream::StreamExt;
use clap::Parser;
use cluster::{
implement::NodeId,
rpc::{
gateway::{NodePing, NodePong},
general::{MediaEndpointCloseRequest, MediaEndpointCloseResponse, NodeInfo, ServerType},
webrtc::{WebrtcPatchRequest, WebrtcPatchResponse, WebrtcRemoteIceRequest, WebrtcRemoteIceResponse},
RpcEmitter, RpcEndpoint, RpcRequest, RPC_MEDIA_ENDPOINT_CLOSE, RPC_WEBRTC_CONNECT, RPC_WEBRTC_ICE, RPC_WEBRTC_PATCH, RPC_WHEP_CONNECT, RPC_WHIP_CONNECT,
RpcEmitter, RpcEndpoint, RpcRequest, RPC_MEDIA_ENDPOINT_CLOSE, RPC_NODE_PING, RPC_WEBRTC_CONNECT, RPC_WEBRTC_ICE, RPC_WEBRTC_PATCH, RPC_WHEP_CONNECT, RPC_WHIP_CONNECT,
},
Cluster, ClusterEndpoint, MEDIA_SERVER_SERVICE,
Cluster, ClusterEndpoint, GLOBAL_GATEWAY_SERVICE, MEDIA_SERVER_SERVICE,
};
use futures::{select, FutureExt};
use media_utils::{SystemTimer, Timer};
use media_utils::{SystemTimer, Timer, F32};
use metrics::describe_counter;
use metrics_dashboard::build_dashboard_route;
use poem::{web::Json, Route};
Expand All @@ -37,6 +39,7 @@ use self::{
rpc::{cluster::GatewayClusterRpc, http::GatewayHttpApis, RpcEvent},
};

pub use self::logic::GatewayMode;
use super::MediaServerContext;

mod logic;
Expand All @@ -49,9 +52,25 @@ const GATEWAY_SESSIONS_CONNECT_ERROR: &str = "gateway.sessions.connect.error";
/// Media Server Webrtc
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct GatewayArgs {}
pub struct GatewayArgs {
/// Gateway mode
#[arg(value_enum, env, long, default_value_t = GatewayMode::Inner)]
pub mode: GatewayMode,

/// Gateway group, only set if mode is Inner
#[arg(env, long, default_value = "")]
pub group: String,

/// lat location
#[arg(env, long, default_value_t = 0.0)]
pub lat: f32,

/// lng location
#[arg(env, long, default_value_t = 0.0)]
pub lng: f32,
}

pub async fn run_gateway_server<C, CR, RPC, REQ, EMITTER>(http_port: u16, http_tls: bool, _opts: GatewayArgs, ctx: MediaServerContext<()>, cluster: C, rpc_endpoint: RPC) -> Result<(), &'static str>
pub async fn run_gateway_server<C, CR, RPC, REQ, EMITTER>(http_port: u16, http_tls: bool, opts: GatewayArgs, ctx: MediaServerContext<()>, cluster: C, rpc_endpoint: RPC) -> Result<(), &'static str>
where
C: Cluster<CR> + Send + 'static,
CR: ClusterEndpoint + Send + 'static,
Expand Down Expand Up @@ -89,15 +108,23 @@ where

http_server.start(route, ctx.clone()).await;
let mut tick = async_std::stream::interval(Duration::from_millis(100));
let mut gateway_logic = GatewayLogic::new();
let mut gateway_logic = GatewayLogic::new(opts.mode);
let rpc_emitter = rpc_endpoint.emitter();
let mut gateway_feedback_tick = async_std::stream::interval(Duration::from_millis(2000));

loop {
let rpc = select! {
_ = tick.next().fuse() => {
gateway_logic.on_tick(timer.now_ms());
continue;
}
_ = gateway_feedback_tick.next().fuse() => {
if matches!(opts.mode, GatewayMode::Inner) {
ping_global_gateway(&gateway_logic, &opts.group, (F32::<2>::new(opts.lat), F32::<2>::new(opts.lng)), node_id, &rpc_emitter);
}

continue;
},
rpc = http_server.recv().fuse() => {
rpc.ok_or("HTTP_SERVER_ERROR")?
},
Expand Down Expand Up @@ -201,3 +228,22 @@ where
}
}
}

fn ping_global_gateway<EMITTER: RpcEmitter + Send + 'static>(logic: &GatewayLogic, group: &str, location: (F32<2>, F32<2>), node_id: NodeId, rpc_emitter: &EMITTER) {
let stats = logic.stats();
let req = NodePing {
node_id,
group: group.to_string(),
location: Some(location),
rtmp: stats.rtmp,
sip: stats.sip,
webrtc: stats.webrtc,
};

let rpc_emitter = rpc_emitter.clone();
async_std::task::spawn(async move {
if let Err(e) = rpc_emitter.request::<_, NodePong>(GLOBAL_GATEWAY_SERVICE, None, RPC_NODE_PING, req, 1000).await {
log::error!("[Gateway] ping global gateway error {:?}", e);
}
});
}
91 changes: 72 additions & 19 deletions servers/media-server/src/server/gateway/logic.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
use std::collections::HashMap;

use clap::ValueEnum;
use cluster::{
implement::NodeId,
rpc::gateway::{NodePing, NodePong},
rpc::gateway::{NodePing, NodePong, ServiceInfo},
};
use media_utils::F32;

use self::service::ServiceRegistry;
mod global_registry;
mod inner_registry;

mod service;
trait ServiceRegistry {
fn on_tick(&mut self, now_ms: u64);
fn on_ping(&mut self, now_ms: u64, group: &str, location: Option<(F32<2>, F32<2>)>, node_id: NodeId, usage: u8, live: u32, max: u32);
fn best_nodes(&mut self, location: Option<(F32<2>, F32<2>)>, max_usage: u8, max_usage_fallback: u8, size: usize) -> Vec<NodeId>;
fn stats(&self) -> ServiceInfo;
}

#[derive(ValueEnum, Clone, Copy, Debug)]
pub enum GatewayMode {
Global,
Inner,
}

/// Represents the type of service.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
Expand All @@ -17,15 +31,22 @@ pub enum ServiceType {
Sip,
}

pub struct GatewayStats {
pub rtmp: Option<ServiceInfo>,
pub sip: Option<ServiceInfo>,
pub webrtc: Option<ServiceInfo>,
}

/// Represents the gateway logic for handling node pings and managing services.
pub struct GatewayLogic {
services: HashMap<ServiceType, ServiceRegistry>,
mode: GatewayMode,
services: HashMap<ServiceType, Box<dyn ServiceRegistry>>,
}

impl GatewayLogic {
/// Creates a new instance of `GatewayLogic`.
pub fn new() -> Self {
Self { services: Default::default() }
pub fn new(mode: GatewayMode) -> Self {
Self { mode, services: Default::default() }
}

/// Handles the tick event.
Expand All @@ -47,13 +68,13 @@ impl GatewayLogic {
/// A `NodePong` struct with a success flag indicating the success of the ping operation.
pub fn on_ping(&mut self, now_ms: u64, ping: &NodePing) -> NodePong {
if let Some(meta) = &ping.webrtc {
self.on_node_ping_service(now_ms, ping.node_id, ServiceType::Webrtc, meta.usage, meta.live, meta.max);
self.on_node_ping_service(now_ms, ping.node_id, ServiceType::Webrtc, &ping.group, ping.location, meta.usage, meta.live, meta.max);
}
if let Some(meta) = &ping.rtmp {
self.on_node_ping_service(now_ms, ping.node_id, ServiceType::Rtmp, meta.usage, meta.live, meta.max);
self.on_node_ping_service(now_ms, ping.node_id, ServiceType::Rtmp, &ping.group, ping.location, meta.usage, meta.live, meta.max);
}
if let Some(meta) = &ping.sip {
self.on_node_ping_service(now_ms, ping.node_id, ServiceType::Sip, meta.usage, meta.live, meta.max);
self.on_node_ping_service(now_ms, ping.node_id, ServiceType::Sip, &ping.group, ping.location, meta.usage, meta.live, meta.max);
}
NodePong { success: true }
}
Expand All @@ -70,8 +91,11 @@ impl GatewayLogic {
/// # Returns
///
/// A vector of `NodeId` representing the best nodes for the service.
pub fn best_nodes(&mut self, service: ServiceType, max_usage: u8, max_usage_fallback: u8, size: usize) -> Vec<NodeId> {
self.services.get_mut(&service).map(|s| s.best_nodes(max_usage, max_usage_fallback, size)).unwrap_or_else(|| vec![])
pub fn best_nodes(&mut self, location: Option<(F32<2>, F32<2>)>, service: ServiceType, max_usage: u8, max_usage_fallback: u8, size: usize) -> Vec<NodeId> {
self.services
.get_mut(&service)
.map(|s| s.best_nodes(location, max_usage, max_usage_fallback, size))
.unwrap_or_else(|| vec![])
}

/// Handles the ping event for a specific service of a node.
Expand All @@ -83,35 +107,62 @@ impl GatewayLogic {
/// * `service` - The type of service.
/// * `usage` - The usage value.
/// * `max` - The maximum value.
fn on_node_ping_service(&mut self, now_ms: u64, node_id: NodeId, service: ServiceType, usage: u8, live: u32, max: u32) {
let service = self.services.entry(service).or_insert_with(move || ServiceRegistry::new(service));
service.on_ping(now_ms, node_id, usage, live, max);
fn on_node_ping_service(&mut self, now_ms: u64, node_id: NodeId, service: ServiceType, group: &str, location: Option<(F32<2>, F32<2>)>, usage: u8, live: u32, max: u32) {
let service = self.services.entry(service).or_insert_with(|| match self.mode {
GatewayMode::Global => Box::new(global_registry::ServiceGlobalRegistry::new(service)),
GatewayMode::Inner => Box::new(inner_registry::ServiceInnerRegistry::new(service)),
});
service.on_ping(now_ms, group, location, node_id, usage, live, max);
}

/// Returns the statistics for the gateway server.
///
/// # Returns
///
/// A `GatewayStats` struct containing the statistics for each service.
pub fn stats(&self) -> GatewayStats {
let rtmp = None;
let sip = None;
let mut webrtc = None;

for (service, registry) in &self.services {
match service {
ServiceType::Webrtc => webrtc = Some(registry.stats()),
// ServiceType::Rtmp => rtmp = Some(registry.stats()), //TODO support rtmp
// ServiceType::Sip => sip = Some(registry.stats()), //TODO support sip
_ => {}
}
}

GatewayStats { rtmp, sip, webrtc }
}
}

#[cfg(test)]
mod tests {
use cluster::rpc::gateway::{NodePing, ServiceInfo};

use crate::server::gateway::logic::GatewayLogic;
use crate::server::gateway::logic::{GatewayLogic, GatewayMode};

#[test]
fn test_gateway_logic_creation() {
let gateway_logic = GatewayLogic::new();
let gateway_logic = GatewayLogic::new(GatewayMode::Inner);
assert_eq!(gateway_logic.services.len(), 0);
}

#[test]
fn test_on_tick_without_services() {
let mut gateway_logic = GatewayLogic::new();
let mut gateway_logic = GatewayLogic::new(GatewayMode::Inner);
gateway_logic.on_tick(0);
}

#[test]
fn test_on_ping_with_valid_node_ping() {
let mut gateway_logic = GatewayLogic::new();
let mut gateway_logic = GatewayLogic::new(GatewayMode::Inner);
let node_ping = NodePing {
node_id: 1,
group: "".to_string(),
location: None,
webrtc: Some(ServiceInfo {
usage: 50,
live: 50,
Expand All @@ -136,9 +187,11 @@ mod tests {

#[test]
fn test_on_ping_with_no_services() {
let mut gateway_logic = GatewayLogic::new();
let mut gateway_logic = GatewayLogic::new(GatewayMode::Inner);
let node_ping = NodePing {
node_id: 1,
group: "".to_string(),
location: None,
webrtc: None,
rtmp: None,
sip: None,
Expand Down
Loading

0 comments on commit 0c150aa

Please sign in to comment.