Skip to content

Commit

Permalink
feat: connector support http export transport (#233)
Browse files Browse the repository at this point in the history
* feat: connector support http export transport

* fix: update header for connector http transport

* chore: resolve comment about derive_more and http handling

* chore: update the documents relating to connector server
  • Loading branch information
luongngocminh authored Mar 4, 2024
1 parent 3276dc7 commit a1453a0
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 13 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
prost = "0.12"
poem-openapi = { version = "4.0" }
derive_more = { version = "0.99" }
2 changes: 1 addition & 1 deletion docs/contributor-guide/servers/connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ If we have multiple connector servers, the routing algorithm will send each even

We can have multiple connector servers in each zone, and then the routing algorithm will send the event log to the connector server in the same zone.

Each connector server is connected to a message queue, and external services can get event logs from the message queue. Currently, we only support NATS message queue.
Each connector server is connected to a message queue, and external services can get event logs from the message queue. Currently, we only support NATS message queue and HTTP POST API endpoints.
2 changes: 2 additions & 0 deletions docs/user-guide/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ Options:
--mq-uri <MQ_URI> Message Queue URI in the form of `amqp://user:pass@host:port/vhost` [env: MQ_URI=] [default: nats://localhost:4222]
--mq-channel <MQ_CHANNEL> MQ Channel [env: MQ_CHANNEL=] [default: atm0s/event_log]
--backup-path <BACKUP_PATH> Filebase backup path for logs [env: BACKUP_PATH=] [default: .atm0s/data/connector-queue]
--format <FORMAT> The output format of the message, for now it can either be `protobuf` or `json` [env: FORMAT=] [default: protobuf]
-h, --help Print help
-V, --version Print version
```

Currently, we only support NATS as the message queue. However, we have designed the system to easily support other message queues such as RabbitMQ or Kafka by implementing the necessary interfaces.
You can also use an HTTP API endpoint to receive the cluster events, simply by configuring the MQ URI to be that API Endpoints: `http(s)://localhost:4000/events`. The events will be delivered through a POST request in the specified format. If the format is `protobuf`, the request header will include the content type of `application/octet-stream`.

For persistent data storage, we use local files. You can configure the backup path for storing the data by setting the `backup-path` option.
1 change: 1 addition & 0 deletions packages/protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ build = "build.rs"

[dependencies]
prost = { workspace = true }
serde = { workspace = true }

[build-dependencies]
prost-build = "0.12"
1 change: 1 addition & 0 deletions packages/protocol/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ extern crate prost_build;

fn main() {
let mut config = prost_build::Config::new();
config.type_attribute(".", "#[derive(serde::Serialize)]");
config.compile_protos(&["src/atm0s.proto"], &["src/"]).unwrap();
}
2 changes: 2 additions & 0 deletions servers/media-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ log = { workspace = true }
poem = { version = "2.0", features = ["embed", "rustls"] }
poem-openapi = { workspace = true, features = ["swagger-ui", "static-files"] }
serde = { workspace = true }
serde_json = { workspace = true }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "std"] }
rust-embed = { version = "8.2", optional = true }
rsip = { version = "0.4.0", optional = true }
Expand All @@ -38,6 +39,7 @@ rand = "0.8.5"
yaque = { version = "0.6.6", optional = true }
maxminddb = { version = "0.24.0", optional = true }
bincode = { version = "1" }
derive_more = { workspace = true }

[dev-dependencies]
md5 = "0.7.0"
Expand Down
15 changes: 11 additions & 4 deletions servers/media-server/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ mod transports;
use self::{
queue::TransporterQueue,
rpc::{cluster::ConnectorClusterRpc, http::ConnectorHttpApis, InternalControl, RpcEvent},
transports::nats::NatsTransporter,
transports::{parse_uri, ConnectorTransporter},
transports::{nats::NatsTransporter, parse_uri, ConnectorTransporter, Format},
};

use super::MediaServerContext;
Expand All @@ -33,7 +32,7 @@ use super::MediaServerContext;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct ConnectorArgs {
/// Message Queue URI in the form of `amqp://user:pass@host:port/vhost`
/// Message Queue URI in the form of `http://host:port` or `nats://host:port`
#[arg(env, long, default_value = "nats://localhost:4222")]
mq_uri: String,

Expand All @@ -48,6 +47,10 @@ pub struct ConnectorArgs {
/// Max conn
#[arg(env, long, default_value_t = 100)]
pub max_conn: u64,

/// Output format
#[arg(env, long, default_value_t = Format::Protobuf)]
format: Format,
}

pub async fn run_connector_server<C, CR, RPC, REQ, EMITTER>(
Expand All @@ -74,11 +77,15 @@ where

let transporter: Box<dyn ConnectorTransporter<MediaEndpointLogRequest>> = match protocol.as_str() {
"nats" => {
let nats = NatsTransporter::<MediaEndpointLogRequest>::new(opts.mq_uri.clone(), opts.mq_channel.clone())
let nats = NatsTransporter::<MediaEndpointLogRequest>::new(&opts.mq_uri, &opts.mq_channel, &opts.format)
.await
.expect("Nats should be connected");
Box::new(nats)
}
"http" | "https" => {
let http = transports::http::HttpTransporter::<MediaEndpointLogRequest>::new(&opts.mq_uri, &opts.format);
Box::new(http)
}
_ => {
log::error!("Unsupported transporter");
return Err("Unsupported transporter");
Expand Down
54 changes: 54 additions & 0 deletions servers/media-server/src/server/connector/transports/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::{io, marker::PhantomData};

use async_trait::async_trait;
use prost::Message;
use serde::Serialize;

use super::{ConnectorTransporter, Format};

pub struct HttpTransporter<M: Message + Serialize> {
client: reqwest::Client,
url: String,
format: Format,
_tmp: PhantomData<M>,
}

impl<M: Message + Serialize> HttpTransporter<M> {
pub fn new(url: &str, format: &Format) -> Self {
Self {
client: reqwest::Client::new(),
url: url.to_string(),
format: format.clone(),
_tmp: Default::default(),
}
}
}

#[async_trait]
impl<M: Message + Serialize> ConnectorTransporter<M> for HttpTransporter<M> {
async fn send(&mut self, data: M) -> Result<(), io::Error> {
let res = match self.format {
Format::Json => self.client.post(&self.url).json(&data).send().await,
Format::Protobuf => self.client.post(&self.url).body(data.encode_to_vec()).header("Content-Type", "application/octet-stream").send().await,
};

match res {
Ok(res) => {
if res.status().is_success() {
log::debug!("Data sent to {}", self.url);
return Ok(());
}
log::error!("Failed to send data to {}: {:?}", self.url, res);
return Err(io::Error::new(io::ErrorKind::Other, "Failed to send data"));
}
Err(e) => {
log::error!("Failed to send data to {}: {:?}", self.url, e);
return Err(io::Error::new(io::ErrorKind::Other, "Failed to send data"));
}
};
}

async fn close(&mut self) -> Result<(), io::Error> {
Ok(())
}
}
13 changes: 12 additions & 1 deletion servers/media-server/src/server/connector/transports/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
use derive_more::Display;
use std::io;

use async_trait::async_trait;
use clap::ValueEnum;
use prost::Message;

pub mod http;
pub mod nats;

#[derive(Debug, PartialEq, Eq)]
pub enum ParseURIError {
InvalidURI,
}

#[derive(Debug, ValueEnum, Clone, Display)]
pub enum Format {
#[display(fmt = "json")]
Json,
#[display(fmt = "protobuf")]
Protobuf,
}

#[async_trait]
pub trait ConnectorTransporter<M: Message + TryFrom<Vec<u8>>>: Send + Sync {
pub trait ConnectorTransporter<M: Message>: Send + Sync {
async fn close(&mut self) -> Result<(), io::Error>;
async fn send(&mut self, data: M) -> Result<(), io::Error>;
}
Expand Down
23 changes: 16 additions & 7 deletions servers/media-server/src/server/connector/transports/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ use std::{io, marker::PhantomData, time::Duration};

use async_trait::async_trait;
use prost::Message;
use serde::Serialize;

use super::ConnectorTransporter;
use super::{ConnectorTransporter, Format};

pub struct NatsTransporter<M: Message + Clone + TryFrom<Vec<u8>>> {
pub struct NatsTransporter<M: Message + Clone + TryFrom<Vec<u8>> + Serialize> {
conn: nats::asynk::Connection,
subject: String,
format: Format,
_tmp: PhantomData<M>,
}

impl<M: Message + Clone + TryFrom<Vec<u8>>> NatsTransporter<M> {
pub async fn new(uri: String, subject: String) -> Result<Self, io::Error> {
impl<M: Message + Clone + TryFrom<Vec<u8>> + Serialize> NatsTransporter<M> {
pub async fn new(uri: &str, subject: &str, format: &Format) -> Result<Self, io::Error> {
log::info!("Connecting to NATS server: {}", uri);
Ok(Self {
conn: nats::asynk::Options::new()
Expand All @@ -24,16 +26,23 @@ impl<M: Message + Clone + TryFrom<Vec<u8>>> NatsTransporter<M> {
.close_callback(|| panic!("connection has been closed")) //this should not happen
.connect(uri)
.await?,
subject,
subject: subject.to_string(),
format: format.clone(),
_tmp: Default::default(),
})
}
}

#[async_trait]
impl<M: Message + Clone + TryFrom<Vec<u8>>> ConnectorTransporter<M> for NatsTransporter<M> {
impl<M: Message + Clone + TryFrom<Vec<u8>> + Serialize> ConnectorTransporter<M> for NatsTransporter<M> {
async fn send(&mut self, data: M) -> Result<(), io::Error> {
let data = data.encode_to_vec();
let data: Vec<u8> = match self.format {
Format::Json => match serde_json::to_string(&data) {
Ok(data) => data.as_bytes().to_vec(),
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
},
Format::Protobuf => data.encode_to_vec(),
};
self.conn.publish(&self.subject, data).await?;
Ok(())
}
Expand Down

0 comments on commit a1453a0

Please sign in to comment.