Skip to content

Commit

Permalink
add worker and storage for hook
Browse files Browse the repository at this point in the history
  • Loading branch information
marverlous811 committed Aug 19, 2024
1 parent 5578c93 commit 421e9aa
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 25 deletions.
15 changes: 9 additions & 6 deletions bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, service
use clap::Parser;
use media_server_connector::{
handler_service::{self, ConnectorHandlerServiceBuilder},
hook_producer::{ConnectorHookProducer, HookPublisher},
hooks::{ConnectorHookController, HookControllerCfg, HookPublisher},
sql_storage::ConnectorStorage,
Storage, HANDLER_SERVICE_ID,
};
Expand Down Expand Up @@ -61,12 +61,13 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri, &args.s3_uri).await);
let hook_publisher: Option<Box<dyn HookPublisher>> = if let Some(hook_uri) = args.hook_uri {
Some(Box::new(http_hook_publisher::HttpHookPublisher::new(hook_uri)))
let hook_publisher: Option<Arc<dyn HookPublisher>> = if let Some(hook_uri) = args.hook_uri {
log::info!("[Connector] Hook publisher enabled with uri {}", hook_uri);
Some(Arc::new(http_hook_publisher::HttpHookPublisher::new(hook_uri)))
} else {
None
};
let mut connector_hook_producer = ConnectorHookProducer::new(hook_publisher);
let mut hook_controller = ConnectorHookController::new(hook_publisher, HookControllerCfg { worker_num: 5, job_num: 10 });

let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert");
let default_cluster_key_buf = include_bytes!("../../certs/cluster.key");
Expand Down Expand Up @@ -156,10 +157,11 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
loop {
select! {
Some((from, ts, _req_id, req)) = connector_hook_rx.recv() => {
connector_hook_producer.on_event(from, ts, req);
log::error!("[MediaConnector] hook event {:?}", req);
hook_controller.on_event(from, ts, req);
}
_ = interval.tick() => {
connector_hook_producer.on_tick().await;
hook_controller.on_tick().await;
}
else => {
break;
Expand Down Expand Up @@ -196,6 +198,7 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event {
media_server_connector::handler_service::Event::Req(from, ts, req_id, event) => {
let ev = event.clone();
log::error!("[MediaConnector] hook event {:?}", ev);
if let Err(e) = connector_storage_tx.send((from, ts, req_id, event)).await {
log::error!("[MediaConnector] send event to storage error {:?}", e);
}
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/connector/http_hook_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::Error;

use media_server_connector::{hook_producer::HookPublisher, hooks::events::HookEvent};
use media_server_connector::{hooks::events::HookEvent, hooks::HookPublisher};

pub struct HttpHookPublisher {
uri: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use std::io::Error;
pub mod events;
pub mod storage;
pub mod worker;

use std::{io::Error, sync::Arc};

use async_trait::async_trait;
use atm0s_sdn::{sans_io_runtime::collections::DynamicDeque, NodeId};
use atm0s_sdn::NodeId;
use media_server_protocol::protobuf::cluster_connector::{connector_request, peer_event};
use media_server_utils::now_ms;
use storage::HookStorage;
use uuid::{NoContext, Timestamp};
use worker::HookWorker;

use crate::hooks::events::HookEvent;

Expand All @@ -12,16 +19,27 @@ pub trait HookPublisher {
async fn publish(&self, event: HookEvent) -> Option<Error>;
}

pub struct ConnectorHookProducer {
queue: DynamicDeque<HookEvent, 1024>,
publisher: Option<Box<dyn HookPublisher>>,
pub struct HookControllerCfg {
pub worker_num: u16,
pub job_num: u16,
}

pub struct ConnectorHookController {
cfg: HookControllerCfg,
workers: Vec<HookWorker>,
storage: Arc<dyn HookStorage>,
}

impl ConnectorHookProducer {
pub fn new(publisher: Option<Box<dyn HookPublisher>>) -> Self {
impl ConnectorHookController {
pub fn new(publisher: Option<Arc<dyn HookPublisher>>, cfg: HookControllerCfg) -> Self {
let mut workers = Vec::new();
for _ in 0..cfg.worker_num {
workers.push(HookWorker::new(publisher.clone()));
}
Self {
queue: DynamicDeque::default(),
publisher,
cfg,
workers,
storage: Arc::new(storage::InMemoryHookStorage::default()),
}
}

Expand Down Expand Up @@ -187,17 +205,25 @@ impl ConnectorHookProducer {
peer_event::Event::Stats(_params) => None,
};
if let Some(hook_data) = hook_data {
self.queue.push_back(hook_data);
log::error!("hook_data: {:?}", hook_data);
self.storage.push_back(hook_data);
}
Some(())
}

pub async fn on_tick(&mut self) -> Option<()> {
if let Some(hook_data) = self.queue.pop_front() {
if let Some(publisher) = self.publisher.as_ref() {
let _ = publisher.publish(hook_data).await;
}
pub async fn on_tick(&mut self) {
// log::error!("on_tick: on worker ticks");
for worker in self.workers.iter_mut() {
worker.on_tick().await;
}
Some(())

let jobs = self.storage.jobs(self.cfg.job_num as i16);
for job in jobs.iter() {
// log::error!("job: {:?}", job.payload);
let path = job.payload.session() % (self.cfg.worker_num as u64);
self.workers[path as usize].push(job.clone());
}

self.storage.clean_timeout_event(now_ms());
}
}
18 changes: 18 additions & 0 deletions packages/media_connector/src/hooks/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ impl HookEvent {
HookEvent::LocalTrack { uuid, .. } => uuid,
}
}

pub fn session(&self) -> u64 {
match self {
HookEvent::Session { session, .. } => *session,
HookEvent::Peer { session, .. } => *session,
HookEvent::RemoteTrack { session, .. } => *session,
HookEvent::LocalTrack { session, .. } => *session,
}
}

pub fn ts(&self) -> u64 {
match self {
HookEvent::Session { ts, .. } => *ts,
HookEvent::Peer { ts, .. } => *ts,
HookEvent::RemoteTrack { ts, .. } => *ts,
HookEvent::LocalTrack { ts, .. } => *ts,
}
}
}

impl<'de> Deserialize<'de> for HookEvent {
Expand Down
1 change: 0 additions & 1 deletion packages/media_connector/src/hooks/mod.rs

This file was deleted.

114 changes: 114 additions & 0 deletions packages/media_connector/src/hooks/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::sync::{Arc, RwLock};

use media_server_utils::now_ms;

use super::events::HookEvent;

#[derive(Clone)]
pub struct HookJobData {
pub payload: HookEvent,
pub ts: u64,
on_done: Arc<dyn Fn(String)>,
}

impl HookJobData {
pub fn ack(&self) {
(self.on_done)(self.payload.id().to_string());
}
}

pub trait HookStorage {
fn push_back(&self, data: HookEvent);
fn jobs(&self, limit: i16) -> Vec<HookJobData>;
fn clean_timeout_event(&self, now: u64);
}

pub struct InMemoryHookStorage {
queue: Arc<RwLock<Vec<HookJobData>>>,
}

impl InMemoryHookStorage {
pub fn default() -> Self {
Self { queue: Default::default() }
}

pub fn len(&self) -> usize {
self.queue.read().unwrap().len()
}
}

impl HookStorage for InMemoryHookStorage {
fn push_back(&self, data: HookEvent) {
let cloned_queue = self.queue.clone();
let ack = move |uuid: String| {
let mut queue = cloned_queue.write().unwrap();
queue.retain(|job| job.payload.id() != uuid.as_str());
};
let ack = Arc::new(ack);
let mut queue = self.queue.write().unwrap();
queue.push(HookJobData {
payload: data,
ts: now_ms(),
on_done: ack,
});
}

fn jobs(&self, limit: i16) -> Vec<HookJobData> {
let queue = self.queue.read().unwrap();
let mut jobs = Vec::new();
for job in queue.iter() {
jobs.push(job.clone());
if jobs.len() as i16 >= limit {
break;
}
}
jobs
}

fn clean_timeout_event(&self, now: u64) {
let mut queue = self.queue.write().unwrap();
queue.retain(|job| now - job.ts < 5000);
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_in_memory_hook_storage() {
let storage = InMemoryHookStorage::default();
// let cloned = storage.clone();

for i in 0..10 {
let event = HookEvent::Peer {
uuid: i.to_string(),
node: 1,
ts: i,
session: 1,
room: "a".to_string(),
peer: "a".to_string(),
event: crate::hooks::events::PeerEvent::Joined,
};
storage.push_back(event);
}

let jobs = storage.jobs(2);
let job_ids = jobs.iter().map(|job| job.payload.id()).collect::<Vec<&str>>();
assert_eq!(job_ids, vec!["0", "1"]);

let first_job = jobs.first().unwrap();
first_job.ack();
assert_eq!(storage.len(), 9);

let jobs = storage.jobs(2);
let job_ids = jobs.iter().map(|job| job.payload.id()).collect::<Vec<&str>>();
assert_eq!(job_ids, vec!["1", "2"]);

storage.clean_timeout_event(now_ms());
assert_eq!(storage.len(), 9);

storage.clean_timeout_event(now_ms() + 5000);
assert_eq!(storage.len(), 0);
}
}
31 changes: 31 additions & 0 deletions packages/media_connector/src/hooks/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::{collections::VecDeque, sync::Arc};

use super::{storage::HookJobData, HookPublisher};

pub struct HookWorker {
queues: VecDeque<HookJobData>,
publisher: Option<Arc<dyn HookPublisher>>,
}

impl HookWorker {
pub fn new(publisher: Option<Arc<dyn HookPublisher>>) -> Self {
Self { queues: VecDeque::new(), publisher }
}

pub fn push(&mut self, data: HookJobData) {
self.queues.push_back(data);
}

pub async fn on_tick(&mut self) {
while let Some(job) = self.queues.pop_front() {
if let Some(publisher) = &self.publisher {
let err = publisher.publish(job.payload.clone()).await;
if err.is_some() {
log::error!("[HookWorker] Failed to publish hook event: {:?}", err);
continue;
}
}
job.ack();
}
}
}
1 change: 0 additions & 1 deletion packages/media_connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use serde_json::Value;

pub mod agent_service;
pub mod handler_service;
pub mod hook_producer;
pub mod hooks;
mod msg_queue;
pub mod sql_storage;
Expand Down

0 comments on commit 421e9aa

Please sign in to comment.