diff --git a/bin/src/server/connector.rs b/bin/src/server/connector.rs index a8b9da1d..b289386c 100644 --- a/bin/src/server/connector.rs +++ b/bin/src/server/connector.rs @@ -4,7 +4,7 @@ use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, service use clap::Parser; use media_server_connector::{ handler_service::{self, ConnectorHandlerServiceBuilder}, - hook_producer::{ConnectorHookProducer, HookPublisher}, + hooks::{ConnectorHookController, HookControllerCfg, HookPublisher}, sql_storage::ConnectorStorage, Storage, HANDLER_SERVICE_ID, }; @@ -61,12 +61,13 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); let connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri, &args.s3_uri).await); - let hook_publisher: Option> = if let Some(hook_uri) = args.hook_uri { - Some(Box::new(http_hook_publisher::HttpHookPublisher::new(hook_uri))) + let hook_publisher: Option> = if let Some(hook_uri) = args.hook_uri { + log::info!("[Connector] Hook publisher enabled with uri {}", hook_uri); + Some(Arc::new(http_hook_publisher::HttpHookPublisher::new(hook_uri))) } else { None }; - let mut connector_hook_producer = ConnectorHookProducer::new(hook_publisher); + let mut hook_controller = ConnectorHookController::new(hook_publisher, HookControllerCfg { worker_num: 5, job_num: 10 }); let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert"); let default_cluster_key_buf = include_bytes!("../../certs/cluster.key"); @@ -156,10 +157,11 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { loop { select! { Some((from, ts, _req_id, req)) = connector_hook_rx.recv() => { - connector_hook_producer.on_event(from, ts, req); + log::error!("[MediaConnector] hook event {:?}", req); + hook_controller.on_event(from, ts, req); } _ = interval.tick() => { - connector_hook_producer.on_tick().await; + hook_controller.on_tick().await; } else => { break; @@ -196,6 +198,7 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event { media_server_connector::handler_service::Event::Req(from, ts, req_id, event) => { let ev = event.clone(); + log::error!("[MediaConnector] hook event {:?}", ev); if let Err(e) = connector_storage_tx.send((from, ts, req_id, event)).await { log::error!("[MediaConnector] send event to storage error {:?}", e); } diff --git a/bin/src/server/connector/http_hook_publisher.rs b/bin/src/server/connector/http_hook_publisher.rs index 0f1b68d1..3ab1a448 100644 --- a/bin/src/server/connector/http_hook_publisher.rs +++ b/bin/src/server/connector/http_hook_publisher.rs @@ -1,6 +1,6 @@ use std::io::Error; -use media_server_connector::{hook_producer::HookPublisher, hooks::events::HookEvent}; +use media_server_connector::{hooks::events::HookEvent, hooks::HookPublisher}; pub struct HttpHookPublisher { uri: String, diff --git a/packages/media_connector/src/hook_producer.rs b/packages/media_connector/src/hooks.rs similarity index 82% rename from packages/media_connector/src/hook_producer.rs rename to packages/media_connector/src/hooks.rs index 4a270877..1cbbf6b3 100644 --- a/packages/media_connector/src/hook_producer.rs +++ b/packages/media_connector/src/hooks.rs @@ -1,9 +1,16 @@ -use std::io::Error; +pub mod events; +pub mod storage; +pub mod worker; + +use std::{io::Error, sync::Arc}; use async_trait::async_trait; -use atm0s_sdn::{sans_io_runtime::collections::DynamicDeque, NodeId}; +use atm0s_sdn::NodeId; use media_server_protocol::protobuf::cluster_connector::{connector_request, peer_event}; +use media_server_utils::now_ms; +use storage::HookStorage; use uuid::{NoContext, Timestamp}; +use worker::HookWorker; use crate::hooks::events::HookEvent; @@ -12,16 +19,27 @@ pub trait HookPublisher { async fn publish(&self, event: HookEvent) -> Option; } -pub struct ConnectorHookProducer { - queue: DynamicDeque, - publisher: Option>, +pub struct HookControllerCfg { + pub worker_num: u16, + pub job_num: u16, +} + +pub struct ConnectorHookController { + cfg: HookControllerCfg, + workers: Vec, + storage: Arc, } -impl ConnectorHookProducer { - pub fn new(publisher: Option>) -> Self { +impl ConnectorHookController { + pub fn new(publisher: Option>, cfg: HookControllerCfg) -> Self { + let mut workers = Vec::new(); + for _ in 0..cfg.worker_num { + workers.push(HookWorker::new(publisher.clone())); + } Self { - queue: DynamicDeque::default(), - publisher, + cfg, + workers, + storage: Arc::new(storage::InMemoryHookStorage::default()), } } @@ -187,17 +205,25 @@ impl ConnectorHookProducer { peer_event::Event::Stats(_params) => None, }; if let Some(hook_data) = hook_data { - self.queue.push_back(hook_data); + log::error!("hook_data: {:?}", hook_data); + self.storage.push_back(hook_data); } Some(()) } - pub async fn on_tick(&mut self) -> Option<()> { - if let Some(hook_data) = self.queue.pop_front() { - if let Some(publisher) = self.publisher.as_ref() { - let _ = publisher.publish(hook_data).await; - } + pub async fn on_tick(&mut self) { + // log::error!("on_tick: on worker ticks"); + for worker in self.workers.iter_mut() { + worker.on_tick().await; } - Some(()) + + let jobs = self.storage.jobs(self.cfg.job_num as i16); + for job in jobs.iter() { + // log::error!("job: {:?}", job.payload); + let path = job.payload.session() % (self.cfg.worker_num as u64); + self.workers[path as usize].push(job.clone()); + } + + self.storage.clean_timeout_event(now_ms()); } } diff --git a/packages/media_connector/src/hooks/events.rs b/packages/media_connector/src/hooks/events.rs index 28a03a57..f03376fe 100644 --- a/packages/media_connector/src/hooks/events.rs +++ b/packages/media_connector/src/hooks/events.rs @@ -134,6 +134,24 @@ impl HookEvent { HookEvent::LocalTrack { uuid, .. } => uuid, } } + + pub fn session(&self) -> u64 { + match self { + HookEvent::Session { session, .. } => *session, + HookEvent::Peer { session, .. } => *session, + HookEvent::RemoteTrack { session, .. } => *session, + HookEvent::LocalTrack { session, .. } => *session, + } + } + + pub fn ts(&self) -> u64 { + match self { + HookEvent::Session { ts, .. } => *ts, + HookEvent::Peer { ts, .. } => *ts, + HookEvent::RemoteTrack { ts, .. } => *ts, + HookEvent::LocalTrack { ts, .. } => *ts, + } + } } impl<'de> Deserialize<'de> for HookEvent { diff --git a/packages/media_connector/src/hooks/mod.rs b/packages/media_connector/src/hooks/mod.rs deleted file mode 100644 index a9970c28..00000000 --- a/packages/media_connector/src/hooks/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod events; diff --git a/packages/media_connector/src/hooks/storage.rs b/packages/media_connector/src/hooks/storage.rs new file mode 100644 index 00000000..441f7868 --- /dev/null +++ b/packages/media_connector/src/hooks/storage.rs @@ -0,0 +1,114 @@ +use std::sync::{Arc, RwLock}; + +use media_server_utils::now_ms; + +use super::events::HookEvent; + +#[derive(Clone)] +pub struct HookJobData { + pub payload: HookEvent, + pub ts: u64, + on_done: Arc, +} + +impl HookJobData { + pub fn ack(&self) { + (self.on_done)(self.payload.id().to_string()); + } +} + +pub trait HookStorage { + fn push_back(&self, data: HookEvent); + fn jobs(&self, limit: i16) -> Vec; + fn clean_timeout_event(&self, now: u64); +} + +pub struct InMemoryHookStorage { + queue: Arc>>, +} + +impl InMemoryHookStorage { + pub fn default() -> Self { + Self { queue: Default::default() } + } + + pub fn len(&self) -> usize { + self.queue.read().unwrap().len() + } +} + +impl HookStorage for InMemoryHookStorage { + fn push_back(&self, data: HookEvent) { + let cloned_queue = self.queue.clone(); + let ack = move |uuid: String| { + let mut queue = cloned_queue.write().unwrap(); + queue.retain(|job| job.payload.id() != uuid.as_str()); + }; + let ack = Arc::new(ack); + let mut queue = self.queue.write().unwrap(); + queue.push(HookJobData { + payload: data, + ts: now_ms(), + on_done: ack, + }); + } + + fn jobs(&self, limit: i16) -> Vec { + let queue = self.queue.read().unwrap(); + let mut jobs = Vec::new(); + for job in queue.iter() { + jobs.push(job.clone()); + if jobs.len() as i16 >= limit { + break; + } + } + jobs + } + + fn clean_timeout_event(&self, now: u64) { + let mut queue = self.queue.write().unwrap(); + queue.retain(|job| now - job.ts < 5000); + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_in_memory_hook_storage() { + let storage = InMemoryHookStorage::default(); + // let cloned = storage.clone(); + + for i in 0..10 { + let event = HookEvent::Peer { + uuid: i.to_string(), + node: 1, + ts: i, + session: 1, + room: "a".to_string(), + peer: "a".to_string(), + event: crate::hooks::events::PeerEvent::Joined, + }; + storage.push_back(event); + } + + let jobs = storage.jobs(2); + let job_ids = jobs.iter().map(|job| job.payload.id()).collect::>(); + assert_eq!(job_ids, vec!["0", "1"]); + + let first_job = jobs.first().unwrap(); + first_job.ack(); + assert_eq!(storage.len(), 9); + + let jobs = storage.jobs(2); + let job_ids = jobs.iter().map(|job| job.payload.id()).collect::>(); + assert_eq!(job_ids, vec!["1", "2"]); + + storage.clean_timeout_event(now_ms()); + assert_eq!(storage.len(), 9); + + storage.clean_timeout_event(now_ms() + 5000); + assert_eq!(storage.len(), 0); + } +} diff --git a/packages/media_connector/src/hooks/worker.rs b/packages/media_connector/src/hooks/worker.rs new file mode 100644 index 00000000..642c0d8f --- /dev/null +++ b/packages/media_connector/src/hooks/worker.rs @@ -0,0 +1,31 @@ +use std::{collections::VecDeque, sync::Arc}; + +use super::{storage::HookJobData, HookPublisher}; + +pub struct HookWorker { + queues: VecDeque, + publisher: Option>, +} + +impl HookWorker { + pub fn new(publisher: Option>) -> Self { + Self { queues: VecDeque::new(), publisher } + } + + pub fn push(&mut self, data: HookJobData) { + self.queues.push_back(data); + } + + pub async fn on_tick(&mut self) { + while let Some(job) = self.queues.pop_front() { + if let Some(publisher) = &self.publisher { + let err = publisher.publish(job.payload.clone()).await; + if err.is_some() { + log::error!("[HookWorker] Failed to publish hook event: {:?}", err); + continue; + } + } + job.ack(); + } + } +} diff --git a/packages/media_connector/src/lib.rs b/packages/media_connector/src/lib.rs index a49fe361..6faa3f46 100644 --- a/packages/media_connector/src/lib.rs +++ b/packages/media_connector/src/lib.rs @@ -4,7 +4,6 @@ use serde_json::Value; pub mod agent_service; pub mod handler_service; -pub mod hook_producer; pub mod hooks; mod msg_queue; pub mod sql_storage;