diff --git a/Cargo.lock b/Cargo.lock index 2e36f249..67e2d371 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -431,6 +431,7 @@ dependencies = [ "atm0s-media-server-utils", "bincode", "clap", + "derive_more", "futures", "log", "maxminddb", @@ -447,6 +448,7 @@ dependencies = [ "rsip", "rust-embed", "serde", + "serde_json", "tracing-subscriber", "yaque", ] @@ -510,6 +512,7 @@ version = "0.1.1" dependencies = [ "prost", "prost-build", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 99082cb7..a5ed64f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,3 +24,4 @@ serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } prost = "0.12" poem-openapi = { version = "4.0" } +derive_more = { version = "0.99" } diff --git a/docs/contributor-guide/servers/connector.md b/docs/contributor-guide/servers/connector.md index 47a76ea6..7954f3ca 100644 --- a/docs/contributor-guide/servers/connector.md +++ b/docs/contributor-guide/servers/connector.md @@ -6,4 +6,4 @@ If we have multiple connector servers, the routing algorithm will send each even We can have multiple connector servers in each zone, and then the routing algorithm will send the event log to the connector server in the same zone. -Each connector server is connected to a message queue, and external services can get event logs from the message queue. Currently, we only support NATS message queue. +Each connector server is connected to a message queue, and external services can get event logs from the message queue. Currently, we only support NATS message queue and HTTP POST API endpoints. diff --git a/docs/user-guide/configuration.md b/docs/user-guide/configuration.md index 94d12d0a..42443741 100644 --- a/docs/user-guide/configuration.md +++ b/docs/user-guide/configuration.md @@ -112,10 +112,12 @@ Options: --mq-uri Message Queue URI in the form of `amqp://user:pass@host:port/vhost` [env: MQ_URI=] [default: nats://localhost:4222] --mq-channel MQ Channel [env: MQ_CHANNEL=] [default: atm0s/event_log] --backup-path Filebase backup path for logs [env: BACKUP_PATH=] [default: .atm0s/data/connector-queue] + --format The output format of the message, for now it can either be `protobuf` or `json` [env: FORMAT=] [default: protobuf] -h, --help Print help -V, --version Print version ``` Currently, we only support NATS as the message queue. However, we have designed the system to easily support other message queues such as RabbitMQ or Kafka by implementing the necessary interfaces. +You can also use an HTTP API endpoint to receive the cluster events, simply by configuring the MQ URI to be that API Endpoints: `http(s)://localhost:4000/events`. The events will be delivered through a POST request in the specified format. If the format is `protobuf`, the request header will include the content type of `application/octet-stream`. For persistent data storage, we use local files. You can configure the backup path for storing the data by setting the `backup-path` option. diff --git a/packages/protocol/Cargo.toml b/packages/protocol/Cargo.toml index 6fdbaa8d..46fe57f8 100644 --- a/packages/protocol/Cargo.toml +++ b/packages/protocol/Cargo.toml @@ -10,6 +10,7 @@ build = "build.rs" [dependencies] prost = { workspace = true } +serde = { workspace = true } [build-dependencies] prost-build = "0.12" diff --git a/packages/protocol/build.rs b/packages/protocol/build.rs index 8ece364a..3118015e 100644 --- a/packages/protocol/build.rs +++ b/packages/protocol/build.rs @@ -2,5 +2,6 @@ extern crate prost_build; fn main() { let mut config = prost_build::Config::new(); + config.type_attribute(".", "#[derive(serde::Serialize)]"); config.compile_protos(&["src/atm0s.proto"], &["src/"]).unwrap(); } diff --git a/servers/media-server/Cargo.toml b/servers/media-server/Cargo.toml index f6f5376c..d128622b 100644 --- a/servers/media-server/Cargo.toml +++ b/servers/media-server/Cargo.toml @@ -25,6 +25,7 @@ log = { workspace = true } poem = { version = "2.0", features = ["embed", "rustls"] } poem-openapi = { workspace = true, features = ["swagger-ui", "static-files"] } serde = { workspace = true } +serde_json = { workspace = true } tracing-subscriber = { version = "0.3.18", features = ["env-filter", "std"] } rust-embed = { version = "8.2", optional = true } rsip = { version = "0.4.0", optional = true } @@ -38,6 +39,7 @@ rand = "0.8.5" yaque = { version = "0.6.6", optional = true } maxminddb = { version = "0.24.0", optional = true } bincode = { version = "1" } +derive_more = { workspace = true } [dev-dependencies] md5 = "0.7.0" diff --git a/servers/media-server/src/server/connector.rs b/servers/media-server/src/server/connector.rs index 351922d4..6bd8fd23 100644 --- a/servers/media-server/src/server/connector.rs +++ b/servers/media-server/src/server/connector.rs @@ -23,8 +23,7 @@ mod transports; use self::{ queue::TransporterQueue, rpc::{cluster::ConnectorClusterRpc, http::ConnectorHttpApis, InternalControl, RpcEvent}, - transports::nats::NatsTransporter, - transports::{parse_uri, ConnectorTransporter}, + transports::{nats::NatsTransporter, parse_uri, ConnectorTransporter, Format}, }; use super::MediaServerContext; @@ -33,7 +32,7 @@ use super::MediaServerContext; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] pub struct ConnectorArgs { - /// Message Queue URI in the form of `amqp://user:pass@host:port/vhost` + /// Message Queue URI in the form of `http://host:port` or `nats://host:port` #[arg(env, long, default_value = "nats://localhost:4222")] mq_uri: String, @@ -48,6 +47,10 @@ pub struct ConnectorArgs { /// Max conn #[arg(env, long, default_value_t = 100)] pub max_conn: u64, + + /// Output format + #[arg(env, long, default_value_t = Format::Protobuf)] + format: Format, } pub async fn run_connector_server( @@ -74,11 +77,15 @@ where let transporter: Box> = match protocol.as_str() { "nats" => { - let nats = NatsTransporter::::new(opts.mq_uri.clone(), opts.mq_channel.clone()) + let nats = NatsTransporter::::new(&opts.mq_uri, &opts.mq_channel, &opts.format) .await .expect("Nats should be connected"); Box::new(nats) } + "http" | "https" => { + let http = transports::http::HttpTransporter::::new(&opts.mq_uri, &opts.format); + Box::new(http) + } _ => { log::error!("Unsupported transporter"); return Err("Unsupported transporter"); diff --git a/servers/media-server/src/server/connector/transports/http.rs b/servers/media-server/src/server/connector/transports/http.rs new file mode 100644 index 00000000..0cc9b08d --- /dev/null +++ b/servers/media-server/src/server/connector/transports/http.rs @@ -0,0 +1,54 @@ +use std::{io, marker::PhantomData}; + +use async_trait::async_trait; +use prost::Message; +use serde::Serialize; + +use super::{ConnectorTransporter, Format}; + +pub struct HttpTransporter { + client: reqwest::Client, + url: String, + format: Format, + _tmp: PhantomData, +} + +impl HttpTransporter { + pub fn new(url: &str, format: &Format) -> Self { + Self { + client: reqwest::Client::new(), + url: url.to_string(), + format: format.clone(), + _tmp: Default::default(), + } + } +} + +#[async_trait] +impl ConnectorTransporter for HttpTransporter { + async fn send(&mut self, data: M) -> Result<(), io::Error> { + let res = match self.format { + Format::Json => self.client.post(&self.url).json(&data).send().await, + Format::Protobuf => self.client.post(&self.url).body(data.encode_to_vec()).header("Content-Type", "application/octet-stream").send().await, + }; + + match res { + Ok(res) => { + if res.status().is_success() { + log::debug!("Data sent to {}", self.url); + return Ok(()); + } + log::error!("Failed to send data to {}: {:?}", self.url, res); + return Err(io::Error::new(io::ErrorKind::Other, "Failed to send data")); + } + Err(e) => { + log::error!("Failed to send data to {}: {:?}", self.url, e); + return Err(io::Error::new(io::ErrorKind::Other, "Failed to send data")); + } + }; + } + + async fn close(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} diff --git a/servers/media-server/src/server/connector/transports/mod.rs b/servers/media-server/src/server/connector/transports/mod.rs index 32fd78ad..a9811fac 100644 --- a/servers/media-server/src/server/connector/transports/mod.rs +++ b/servers/media-server/src/server/connector/transports/mod.rs @@ -1,8 +1,11 @@ +use derive_more::Display; use std::io; use async_trait::async_trait; +use clap::ValueEnum; use prost::Message; +pub mod http; pub mod nats; #[derive(Debug, PartialEq, Eq)] @@ -10,8 +13,16 @@ pub enum ParseURIError { InvalidURI, } +#[derive(Debug, ValueEnum, Clone, Display)] +pub enum Format { + #[display(fmt = "json")] + Json, + #[display(fmt = "protobuf")] + Protobuf, +} + #[async_trait] -pub trait ConnectorTransporter>>: Send + Sync { +pub trait ConnectorTransporter: Send + Sync { async fn close(&mut self) -> Result<(), io::Error>; async fn send(&mut self, data: M) -> Result<(), io::Error>; } diff --git a/servers/media-server/src/server/connector/transports/nats.rs b/servers/media-server/src/server/connector/transports/nats.rs index 27794309..52aef43a 100644 --- a/servers/media-server/src/server/connector/transports/nats.rs +++ b/servers/media-server/src/server/connector/transports/nats.rs @@ -2,17 +2,19 @@ use std::{io, marker::PhantomData, time::Duration}; use async_trait::async_trait; use prost::Message; +use serde::Serialize; -use super::ConnectorTransporter; +use super::{ConnectorTransporter, Format}; -pub struct NatsTransporter>> { +pub struct NatsTransporter> + Serialize> { conn: nats::asynk::Connection, subject: String, + format: Format, _tmp: PhantomData, } -impl>> NatsTransporter { - pub async fn new(uri: String, subject: String) -> Result { +impl> + Serialize> NatsTransporter { + pub async fn new(uri: &str, subject: &str, format: &Format) -> Result { log::info!("Connecting to NATS server: {}", uri); Ok(Self { conn: nats::asynk::Options::new() @@ -24,16 +26,23 @@ impl>> NatsTransporter { .close_callback(|| panic!("connection has been closed")) //this should not happen .connect(uri) .await?, - subject, + subject: subject.to_string(), + format: format.clone(), _tmp: Default::default(), }) } } #[async_trait] -impl>> ConnectorTransporter for NatsTransporter { +impl> + Serialize> ConnectorTransporter for NatsTransporter { async fn send(&mut self, data: M) -> Result<(), io::Error> { - let data = data.encode_to_vec(); + let data: Vec = match self.format { + Format::Json => match serde_json::to_string(&data) { + Ok(data) => data.as_bytes().to_vec(), + Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)), + }, + Format::Protobuf => data.encode_to_vec(), + }; self.conn.publish(&self.subject, data).await?; Ok(()) }