Skip to content

Commit

Permalink
feat: event hook (#420)
Browse files Browse the repository at this point in the history
* add simple webhook

* add hook event for track

* add docs for third party hook

* fix lint and typos

* add uuid for each hook event

* add worker and storage for hook

* update doc

* fix clippy and typos error

* refactor worker and added room events

* feat: room last_peer_leaved and destroy event

* fix missing connector storage tick

* feat: record event

* fix clippy

* fix record upload failed

---------

Co-authored-by: Giang Minh <[email protected]>
  • Loading branch information
marverlous811 and giangndm authored Aug 27, 2024
1 parent 220067c commit 1843b2a
Show file tree
Hide file tree
Showing 30 changed files with 1,356 additions and 286 deletions.
24 changes: 10 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ members = [
[workspace.dependencies]
sans-io-runtime = { version = "0.2", default-features = false }
atm0s-sdn = { version = "0.2.2", default-features = false }
tokio = "1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
convert-enum = "0.1"
clap = "4.5"
num_enum = "0.7"
log = "0.4"
smallmap = "1.4"
serde = "1.0"
derivative = "2.2"
derive_more = "1.0"
rand = "0.8"
Expand Down
4 changes: 2 additions & 2 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ edition = "2021"

[dependencies]
tracing-subscriber = { workspace = true }
clap = { version = "4.5", features = ["env", "derive"] }
clap = { workspace = true, features = ["env", "derive"] }
log = { workspace = true }
rand = { workspace = true }
prost = { workspace = true }
poem = { version = "3.0", features = ["static-files"] }
poem-openapi = { version = "5.0", features = ["swagger-ui"] }
rust-embed = { version = "8.0", features = ["compression"], optional = true }
tokio = { version = "1.37", features = ["full"] }
tokio = { workspace = true, features = ["full"] }
sans-io-runtime = { workspace = true }
atm0s-sdn = { workspace = true }
media-server-protocol = { path = "../packages/protocol", features = ["quinn-rpc"] }
Expand Down
91 changes: 68 additions & 23 deletions bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, service
use clap::Parser;
use media_server_connector::{
handler_service::{self, ConnectorHandlerServiceBuilder},
sql_storage::ConnectorStorage,
Storage, HANDLER_SERVICE_ID,
ConnectorCfg, ConnectorStorage, HookBodyType, HANDLER_SERVICE_ID,
};
use media_server_protocol::{
cluster::{ClusterNodeGenericInfo, ClusterNodeInfo},
connector::CONNECTOR_RPC_PORT,
protobuf::cluster_connector::{connector_response, MediaConnectorServiceServer},
rpc::quinn::QuinnServer,
};
use media_server_utils::select2;
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use tokio::sync::mpsc::channel;

Expand Down Expand Up @@ -48,12 +48,46 @@ pub struct Args {
/// S3 Uri
#[arg(env, long, default_value = "http://user:pass@localhost:9000/bucket/path/?path_style=true")]
s3_uri: String,

/// Hook Uri.
/// If set, will send hook event to this uri. example: http://localhost:8080/hook
#[arg(env, long)]
hook_uri: Option<String>,

/// Hook workers
#[arg(env, long, default_value_t = 8)]
hook_workers: usize,

/// Hook body type
#[arg(env, long, default_value = "protobuf-json")]
hook_body_type: HookBodyType,

/// Destroy room after no-one online, default is 2 minutes
#[arg(env, long, default_value_t = 120_000)]
destroy_room_after_ms: u64,

/// Storage tick interval, default is 1 minute
/// This is used for clearing ended room
#[arg(env, long, default_value_t = 60_000)]
storage_tick_interval_ms: u64,
}

pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri, &args.s3_uri).await);
let mut connector_storage = ConnectorStorage::new(
node.node_id,
ConnectorCfg {
sql_uri: args.db_uri,
s3_uri: args.s3_uri,
hook_url: args.hook_uri,
hook_workers: args.hook_workers,
hook_body_type: args.hook_body_type,
room_destroy_after_ms: args.destroy_room_after_ms,
},
)
.await;
let connector_querier = connector_storage.querier();

let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert");
let default_cluster_key_buf = include_bytes!("../../certs/cluster.key");
Expand Down Expand Up @@ -91,7 +125,7 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
let media_rpc_socket = vnet.udp_socket(CONNECTOR_RPC_PORT).await.expect("Should open virtual port for gateway rpc");
let mut media_rpc_server = MediaConnectorServiceServer::new(
QuinnServer::new(make_quinn_server(media_rpc_socket, default_cluster_key, default_cluster_cert.clone()).expect("Should create endpoint for media rpc server")),
remote_rpc_handler::Ctx { storage: connector_storage.clone() },
remote_rpc_handler::Ctx { storage: Arc::new(connector_querier) },
remote_rpc_handler::ConnectorRemoteRpcHandlerImpl::default(),
);

Expand All @@ -110,29 +144,40 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {

let (connector_storage_tx, mut connector_storage_rx) = channel(1024);
let (connector_handler_control_tx, mut connector_handler_control_rx) = channel(1024);
let mut connector_storage_interval = tokio::time::interval(Duration::from_millis(args.storage_tick_interval_ms));
tokio::task::spawn_local(async move {
while let Some((from, ts, req_id, event)) = connector_storage_rx.recv().await {
match connector_storage.on_event(from, ts, event).await {
Some(res) => {
if let Err(e) = connector_handler_control_tx.send(handler_service::Control::Res(from, req_id, res)).await {
log::error!("[Connector] send control to service error {:?}", e);
}
loop {
match select2::or(connector_storage_interval.tick(), connector_storage_rx.recv()).await {
select2::OrOutput::Left(_) => {
connector_storage.on_tick().await;
}
None => {
if let Err(e) = connector_handler_control_tx
.send(handler_service::Control::Res(
from,
req_id,
connector_response::Response::Error(connector_response::Error {
code: 0, //TODO return error from storage
message: "STORAGE_ERROR".to_string(),
}),
))
.await
{
log::error!("[Connector] send control to service error {:?}", e);
select2::OrOutput::Right(Some((from, ts, req_id, event))) => {
match connector_storage.on_event(from, ts, event).await {
Some(res) => {
if let Err(e) = connector_handler_control_tx.send(handler_service::Control::Res(from, req_id, res)).await {
log::error!("[Connector] send control to service error {:?}", e);
}
}
None => {
if let Err(e) = connector_handler_control_tx
.send(handler_service::Control::Res(
from,
req_id,
connector_response::Response::Error(connector_response::Error {
code: 0, //TODO return error from storage
message: "STORAGE_ERROR".to_string(),
}),
))
.await
{
log::error!("[Connector] send control to service error {:?}", e);
}
}
}
}
select2::OrOutput::Right(None) => {
break;
}
}
}
});
Expand Down
4 changes: 2 additions & 2 deletions bin/src/server/connector/remote_rpc_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use media_server_connector::{sql_storage, Querier};
use media_server_connector::Querier;
use media_server_protocol::protobuf::cluster_connector::{
get_events::EventInfo, get_peers::PeerInfo, get_rooms::RoomInfo, get_sessions::SessionInfo, GetEventParams, GetEvents, GetParams, GetPeerParams, GetPeers, GetRooms, GetSessions,
MediaConnectorServiceHandler, PeerSession,
Expand All @@ -9,7 +9,7 @@ use media_server_protocol::protobuf::shared::Pagination;

#[derive(Clone)]
pub struct Ctx {
pub storage: Arc<sql_storage::ConnectorStorage>, //TODO make it generic
pub storage: Arc<dyn Querier>, //TODO make it generic
}

#[derive(Default)]
Expand Down
3 changes: 1 addition & 2 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
}

while let Ok(control) = vnet_rx.try_recv() {
// TODO: fix bug with send_to_best cause cannot connect, avoid send to worker 0
controller.send_to(0, ExtIn::Sdn(SdnExtIn::FeaturesControl(media_server_runner::UserData::Cluster, control.into()), true));
controller.send_to_best(ExtIn::Sdn(SdnExtIn::FeaturesControl(media_server_runner::UserData::Cluster, control.into()), false));
}
while let Ok(req) = req_rx.try_recv() {
let req_id = req_id_seed;
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<ES: MediaEdgeSecure> MediaRuntimeWorker<ES> {
fn process_out(&mut self, out: WorkerOutput) -> Output {
match out {
WorkerOutput::ExtRpc(req_id, res) => Output::Ext(true, ExtOut::Rpc(req_id, self.index, res)),
WorkerOutput::ExtSdn(out) => Output::Ext(false, ExtOut::Sdn(out)),
WorkerOutput::ExtSdn(out) => Output::Ext(true, ExtOut::Sdn(out)),
WorkerOutput::Bus(event) => match &event {
SdnWorkerBusEvent::Control(_) => Output::Bus(BusControl::Channel(Owner::Sdn, BusChannelControl::Publish(Channel::Controller, true, event))),
SdnWorkerBusEvent::Workers(_) => Output::Bus(BusControl::Broadcast(true, event)),
Expand Down
1 change: 1 addition & 0 deletions docs/user-guide/features/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ In this document, we will explore the key features of atm0s-media-server. Curren
| [Recording](./recording.md) | TODO |
| [Cluster](./cluster.md) | Alpha |
| [extra_data-metadata](./extra_data-metadata.md) | Alpha |
| [Third party event hook](./third-party-system-hook.md) | Alpha |
58 changes: 58 additions & 0 deletions docs/user-guide/features/third-party-system-hook.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Third party system hook

A third-party system hook is a provider that sends internal events from the media server to other systems. The events sent through the hook contain session, peer, and track.

## Usage

The `connector` node sends the hook. So, to enable the hook to provide, you need to use `--hook-uri` to pass the provider's URI when starting the node.

```bash
RUST_LOG=info \
RUST_BACKTRACE=1 \
cargo run -- \
--sdn-zone-id 0 \
--sdn-zone-node-id 4 \
--seeds 1@/ip4/127.0.0.1/udp/10001 \
connector \
--hook-uri "http://localhost:30798/webhook"
```

## Message format

Message will sent to another system by using JSON (serde and serde_json) or Binary format which is generated from Protobuf, defined by HookEvent message:

```protobuf
message HookEvent {
uint32 node = 1;
uint64 ts = 2;
oneof event {
RoomEvent room = 3;
PeerEvent peer = 4;
RecordEvent record = 5;
}
}
```

Example with Json:

```json
{
"node":1,
"ts":1724605969302,
"event":{
"Peer":{
"session_id":3005239549225289700,
"event":{
"RouteBegin":{
"remote_ip":"127.0.0.1"
}
}
}
}
```

## Supported Provider

| provider | status | description |
| -------- | -------------------- | ------------------------------------------------------- |
| webhook | :white_check_mark: | Will send each event using Restful API with POST method |
9 changes: 7 additions & 2 deletions packages/media_connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ edition = "2021"

[dependencies]
log = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
clap = { workspace = true }
serde = { workspace = true, features = ["derive"] }
media-server-protocol = { path = "../protocol" }
media-server-utils = { path = "../media_utils" }
atm0s-sdn = { workspace = true }
prost = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
lru = "0.12"
async-trait = "0.1"
sea-orm-migration = "1.1.0-rc.1"
Expand All @@ -19,9 +21,12 @@ sea-orm = { version = "1.1.0-rc.1", features = [
"sqlx-mysql",
"runtime-tokio-rustls",
] }
sea-query = "0.31"
sea-query = "0.32.0-rc.1"
serde_json = "1.0"
s3-presign = "0.0.2"
uuid = {version = "1.10", features = ["fast-rng", "v7"]}
reqwest = { version = "0.12", features = ["json"]}

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { workspace = true }
6 changes: 4 additions & 2 deletions packages/media_connector/src/agent_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<UserData, SC, SE, TC, TW> ConnectorAgentService<UserData, SC, SE, TC, TW> {
}
}

impl<UserData: Copy + Eq, SC, SE, TC, TW> Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for ConnectorAgentService<UserData, SC, SE, TC, TW>
impl<UserData: Copy + Debug + Eq, SC, SE, TC, TW> Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for ConnectorAgentService<UserData, SC, SE, TC, TW>
where
SC: From<Control> + TryInto<Control> + Debug,
SE: From<Event> + TryInto<Event>,
Expand Down Expand Up @@ -114,10 +114,12 @@ where
data::Event::Recv(_port, _meta, buf) => match ConnectorResponse::decode(buf.as_slice()) {
Ok(msg) => {
if let Some(actor) = self.req_data.remove(&msg.req_id) {
log::info!("[ConnectorAgent] on msg response {:?}", msg);
self.msg_queue.on_ack(msg.req_id);
if let Some(res) = msg.response {
log::info!("[ConnectorAgent] on msg {} response {res:?}, feedback to {actor:?}", msg.req_id);
self.queue.push_back(ServiceOutput::Event(actor, Event::Response(res).into()));
} else {
log::warn!("[ConnectorAgent] on msg {} response None", msg.req_id);
}
} else {
log::warn!("[ConnectorAgent] missing info for msg response {:?}", msg);
Expand Down
Loading

0 comments on commit 1843b2a

Please sign in to comment.