diff --git a/README.md b/README.md index f3d13b0b..8db4e8ef 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ We are actively refactoring entire media server and network stack with [sans-io- | MoQ | Media-over-Quic | ❌ | | Monitoring | Dashboard for monitoring | ❌ | | Recording | Record stream | ❌ | -| Gateway | External gateway [RFC-0003](https://github.com/8xFF/rfcs/pull/3) | ❌ | +| Gateway | External gateway [RFC-0003](https://github.com/8xFF/rfcs/pull/3) | 🚧 | | Connector | External event handling | ❌ | Status: diff --git a/packages/media_gateway/src/store.rs b/packages/media_gateway/src/store.rs index 210726f9..032fed86 100644 --- a/packages/media_gateway/src/store.rs +++ b/packages/media_gateway/src/store.rs @@ -6,7 +6,10 @@ use self::service::ServiceStore; mod service; -#[derive(Debug)] +const MAX_MEMORY_USAGE: u8 = 80; +const MAX_DISK_USAGE: u8 = 90; + +#[derive(Debug, PartialEq)] pub struct PingEvent { pub cpu: u8, pub memory: u8, @@ -52,8 +55,8 @@ impl GatewayStore { pub fn on_ping(&mut self, now: u64, from: u32, ping: PingEvent) { log::debug!("[GatewayStore] on ping from {from} data {:?}", ping); - let node_usage = node_usage(&ping, 80, 90); - let webrtc_usage = webrtc_usage(&ping, 80, 90); + let node_usage = node_usage(&ping); + let webrtc_usage = webrtc_usage(&ping); match ping.origin { Origin::Media(_) => match (node_usage, webrtc_usage, ping.webrtc) { (Some(_node), Some(webrtc), Some(stats)) => self.webrtc.on_node_ping(now, from, webrtc, stats), @@ -85,27 +88,137 @@ impl GatewayStore { } } -fn node_usage(ping: &PingEvent, max_memory: u8, max_disk: u8) -> Option { - if ping.memory >= max_memory { +fn node_usage(ping: &PingEvent) -> Option { + if ping.memory >= MAX_MEMORY_USAGE { return None; } - if ping.disk >= max_disk { + if ping.disk >= MAX_DISK_USAGE { return None; } Some(ping.cpu) } -fn webrtc_usage(ping: &PingEvent, max_memory: u8, max_disk: u8) -> Option { - if ping.memory >= max_memory { +fn webrtc_usage(ping: &PingEvent) -> Option { + if ping.memory >= MAX_MEMORY_USAGE { return None; } - if ping.disk >= max_disk { + if ping.disk >= MAX_DISK_USAGE { return None; } let webrtc = ping.webrtc.as_ref()?; webrtc.active.then(|| ping.cpu.max(((webrtc.live * 100) / webrtc.max) as u8)) } + +#[cfg(test)] +mod tests { + use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, GatewayOrigin, MediaOrigin, Origin, ServiceStats}; + + use crate::ServiceKind; + + use super::{GatewayStore, PingEvent}; + + #[test] + fn local_ping() { + let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }); + store.on_ping( + 0, + 1, + PingEvent { + cpu: 0, + memory: 0, + disk: 0, + origin: Origin::Media(MediaOrigin {}), + webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }), + }, + ); + + assert_eq!(store.best_for(ServiceKind::Webrtc, None), Some(1)); + + assert_eq!(store.pop_output(), None); + store.on_tick(100); + assert_eq!( + store.pop_output(), + Some(PingEvent { + cpu: 0, + memory: 0, + disk: 0, + origin: Origin::Gateway(GatewayOrigin { + location: Some(Location { lat: 1.0, lon: 1.0 }), + zone: 0, + }), + webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }), + }) + ); + } + + #[test] + fn local_reject_max_usage() { + let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }); + store.on_ping( + 0, + 1, + PingEvent { + cpu: 10, + memory: 80, + disk: 20, + origin: Origin::Media(MediaOrigin {}), + webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }), + }, + ); + + store.on_ping( + 0, + 2, + PingEvent { + cpu: 10, + memory: 20, + disk: 90, + origin: Origin::Media(MediaOrigin {}), + webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }), + }, + ); + + assert_eq!(store.best_for(ServiceKind::Webrtc, None), None); + } + + #[test] + fn remote_ping() { + let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }); + store.on_ping( + 0, + 257, + PingEvent { + cpu: 0, + memory: 0, + disk: 0, + origin: Origin::Gateway(GatewayOrigin { + location: Some(Location { lat: 2.0, lon: 2.0 }), + zone: 256, + }), + webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }), + }, + ); + + assert_eq!(store.best_for(ServiceKind::Webrtc, None), Some(257)); + + assert_eq!(store.pop_output(), None); + store.on_tick(100); + assert_eq!( + store.pop_output(), + Some(PingEvent { + cpu: 0, + memory: 0, + disk: 0, + origin: Origin::Gateway(GatewayOrigin { + location: Some(Location { lat: 1.0, lon: 1.0 }), + zone: 0, + }), + webrtc: None, + }) + ); + } +} diff --git a/packages/media_gateway/src/store/service.rs b/packages/media_gateway/src/store/service.rs index b6c6c47b..015bd07f 100644 --- a/packages/media_gateway/src/store/service.rs +++ b/packages/media_gateway/src/store/service.rs @@ -163,8 +163,8 @@ impl ServiceStore { if n.stats.active { stats.active = true; } - stats.live = n.stats.live; - stats.max = n.stats.max; + stats.live += n.stats.live; + stats.max += n.stats.max; } Some(stats) @@ -214,3 +214,112 @@ fn distance(node1: &Location, node2: &Location) -> f32 { //TODO make it more accuracy ((node1.lat - node2.lat).powi(2) + (node1.lon - node2.lon).powi(2)).sqrt() } + +#[cfg(test)] +mod tests { + use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, ServiceStats}; + + use crate::{store::service::PING_TIMEOUT, ServiceKind}; + + use super::ServiceStore; + + #[test] + fn empty_store() { + let store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + assert_eq!(store.best_for(None), None); + assert_eq!(store.best_for(Some(Location { lat: 1.0, lon: 1.0 })), None); + + assert_eq!(store.local_stats(), None); + } + + #[test] + fn local_store() { + let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + + store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true }); + store.on_node_ping(0, 2, 50, ServiceStats { live: 60, max: 1000, active: true }); + + //should got lowest usage + assert_eq!(store.best_for(None), Some(2)); + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(2)); + assert_eq!(store.local_stats(), Some(ServiceStats { live: 160, max: 2000, active: true })); + + //after node2 increase usage should fallback to node1 + store.on_node_ping(0, 2, 61, ServiceStats { live: 120, max: 1000, active: true }); + + assert_eq!(store.best_for(None), Some(1)); + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(1)); + + //after remove should fallback to remain + store.remove_node(1); + + assert_eq!(store.best_for(None), Some(2)); + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(2)); + } + + #[test] + fn remote_zones_store() { + let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + + store.on_gateway_ping(0, 256, 256, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); + store.on_gateway_ping(0, 256, 257, 50, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); + + //should got lowest usage gateway node + assert_eq!(store.best_for(None), Some(257)); + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(257)); + + //after gateway 257 increase usage should switch to 256 + store.on_gateway_ping(0, 256, 257, 65, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); + + assert_eq!(store.best_for(None), Some(256)); + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(256)); + + //should fallback to remain gateway + store.remove_gateway(256, 256); + + assert_eq!(store.best_for(None), Some(257)); + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(257)); + } + + #[test] + fn local_and_remote_zones() { + let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + + store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true }); + store.on_gateway_ping(0, 256, 257, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); + + //should got local zone if don't provide location + assert_eq!(store.best_for(None), Some(1)); + + //should got closest zone gaetway + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(257)); + + //after remove local should fallback to other zone + store.remove_node(1); + + assert_eq!(store.best_for(None), Some(257)); + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(257)); + + //after remove other zone should return None + store.remove_gateway(256, 257); + + assert_eq!(store.best_for(None), None); + assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), None); + } + + #[test] + fn clear_timeout() { + let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + + store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true }); + store.on_gateway_ping(0, 256, 257, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); + + assert_eq!(store.local_sources.len(), 1); + assert_eq!(store.zone_sources.len(), 1); + + store.on_tick(PING_TIMEOUT); + + assert_eq!(store.local_sources.len(), 0); + assert_eq!(store.zone_sources.len(), 0); + } +} diff --git a/packages/media_secure/src/jwt.rs b/packages/media_secure/src/jwt.rs index e44d7e59..163e0eb2 100644 --- a/packages/media_secure/src/jwt.rs +++ b/packages/media_secure/src/jwt.rs @@ -2,6 +2,8 @@ use crate::{MediaEdgeSecure, MediaGatewaySecure}; use jwt_simple::prelude::*; use serde::{de::DeserializeOwned, Serialize}; +const CONN_ID_TYPE: &'static str = "conn"; + pub struct MediaEdgeSecureJwt { key: HS256Key, } @@ -19,20 +21,32 @@ impl MediaEdgeSecure for MediaEdgeSecureJwt { ..Default::default() }; let claims = self.key.verify_token::(token, Some(options)).ok()?; + if let Some(expires_at) = claims.expires_at { + let now = Clock::now_since_epoch(); + if now >= expires_at { + return None; + } + } Some(claims.custom) } fn encode_conn_id(&self, conn: C, ttl_seconds: u64) -> String { - let claims = Claims::with_custom_claims(conn, Duration::from_secs(ttl_seconds)).with_issuer("conn"); + let claims = Claims::with_custom_claims(conn, Duration::from_secs(ttl_seconds)).with_issuer(CONN_ID_TYPE); self.key.authenticate(claims).expect("Should create jwt") } fn decode_conn_id(&self, token: &str) -> Option { let options = VerificationOptions { - allowed_issuers: Some(HashSet::from_strings(&["conn"])), + allowed_issuers: Some(HashSet::from_strings(&[CONN_ID_TYPE])), ..Default::default() }; let claims = self.key.verify_token::(token, Some(options)).ok()?; + if let Some(expires_at) = claims.expires_at { + let now = Clock::now_since_epoch(); + if now >= expires_at { + return None; + } + } Some(claims.custom) } } @@ -60,4 +74,79 @@ impl MediaGatewaySecure for MediaGatewaySecureJwt { let claims = Claims::with_custom_claims(ob, Duration::from_secs(ttl_seconds)).with_issuer(_type); self.key.authenticate(claims).expect("Should create jwt") } + + fn decode_conn_id(&self, token: &str) -> Option { + let options = VerificationOptions { + allowed_issuers: Some(HashSet::from_strings(&[CONN_ID_TYPE])), + ..Default::default() + }; + let claims = self.key.verify_token::(token, Some(options)).ok()?; + if let Some(expires_at) = claims.expires_at { + let now = Clock::now_since_epoch(); + if now >= expires_at { + return None; + } + } + Some(claims.custom) + } +} + +#[cfg(test)] +mod tests { + use std::{thread::sleep, time::Duration}; + + use serde::{Deserialize, Serialize}; + + use crate::{ + jwt::{MediaEdgeSecureJwt, MediaGatewaySecureJwt}, + MediaEdgeSecure, MediaGatewaySecure, + }; + + #[test] + fn object_test() { + #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)] + struct Test { + value: u8, + } + + let secure_key = b"12345678"; + + let gateway_jwt = MediaGatewaySecureJwt::from(secure_key.as_slice()); + let edge_jwt = MediaEdgeSecureJwt::from(secure_key.as_slice()); + + let ob = Test { value: 1 }; + let token = gateway_jwt.encode_obj("test_type", ob.clone(), 1); + + //if wrong _type should error + assert_eq!(edge_jwt.decode_obj::("wrong_type", &token), None, "Should error if wrong type"); + assert_eq!(edge_jwt.decode_obj::("test_type", &token), Some(ob), "Should decode ok"); + + // it should error after timeout 1s + sleep(Duration::from_millis(1300)); + assert_eq!(edge_jwt.decode_obj::("test_type", &token), None, "Should error after timeout"); + } + + #[test] + fn conn_test() { + #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)] + struct Test { + value: u8, + } + + let secure_key = b"12345678"; + + let gateway_jwt = MediaGatewaySecureJwt::from(secure_key.as_slice()); + let edge_jwt = MediaEdgeSecureJwt::from(secure_key.as_slice()); + + let ob = Test { value: 1 }; + let token = edge_jwt.encode_conn_id(ob.clone(), 1); + + assert_eq!(edge_jwt.decode_conn_id::(&token), Some(ob.clone()), "Should decode ok"); + assert_eq!(gateway_jwt.decode_conn_id::(&token), Some(ob), "Should decode ok"); + + // it should error after timeout 1s + sleep(Duration::from_millis(1300)); + assert_eq!(edge_jwt.decode_conn_id::(&token), None, "Should error after timeout"); + assert_eq!(gateway_jwt.decode_conn_id::(&token), None, "Should error after timeout"); + } } diff --git a/packages/media_secure/src/lib.rs b/packages/media_secure/src/lib.rs index bb778ebf..112ef1e5 100644 --- a/packages/media_secure/src/lib.rs +++ b/packages/media_secure/src/lib.rs @@ -6,12 +6,13 @@ pub mod jwt; /// This interface is for validating and generating data in each edge node like media-node pub trait MediaEdgeSecure { fn decode_obj(&self, _type: &'static str, data: &str) -> Option; - fn encode_conn_id(&self, conn: C, ttl_ms: u64) -> String; + fn encode_conn_id(&self, conn: C, ttl_seconds: u64) -> String; fn decode_conn_id(&self, data: &str) -> Option; } -/// This interface for generate signed data for gateway, like connect token +/// This interface for generating signed data for gateway, like connect token pub trait MediaGatewaySecure { fn validate_app(&self, token: &str) -> bool; - fn encode_obj(&self, _type: &'static str, ob: O, ttl_ms: u64) -> String; + fn encode_obj(&self, _type: &'static str, ob: O, ttl_seconds: u64) -> String; + fn decode_conn_id(&self, data: &str) -> Option; }