Skip to content

Commit

Permalink
reduce agent size
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Jan 2, 2025
1 parent 5c5c65e commit e6047bd
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 59 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ protocol-ed25519 = { path = "crates/protocol_ed25519", package = "atm0s-reverse-
log = "0.4"
tokio-yamux = "0.3"
clap = "4.4"
argh = "=0.1.13" # small cli
async-trait = "0.1"
tokio = "1"
httparse = "1.8"
tls-parser = "0.12"
rtsp-types = "0.1"
tracing-subscriber = "0.3"
picolog = "1.0"
atm0s-sdn = "0.2"
serde = "1.0"
bincode = "1.3"
Expand Down
8 changes: 4 additions & 4 deletions bin/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ anyhow = { workspace = true }
thiserror = { workspace = true, optional = true }

# for binary build
tracing-subscriber = { workspace = true, features = ["env-filter", "std"], optional = true }
clap = { workspace = true, features = ["derive", "env"], optional = true }
picolog = { workspace = true, optional = true }
argh = { workspace = true, optional = true }

# for tcp protocol
tokio-yamux = { workspace = true, optional = true }
Expand All @@ -33,7 +33,7 @@ rustls = { workspace = true, features = ["ring", "std"], optional = true }
base64 = { workspace = true, optional = true }

[features]
default = ["binary", "tcp", "quic"]
binary = ["protocol-ed25519", "tracing-subscriber", "clap"]
default = ["binary", "tcp"]
binary = ["protocol-ed25519", "argh", "picolog"]
tcp = ["tokio-yamux"]
quic = ["quinn", "rustls", "base64", "thiserror"]
53 changes: 27 additions & 26 deletions bin/agent/examples/benchmark_clients.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,79 @@
use std::str::FromStr;
use std::{
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};

use argh::FromArgs;
#[cfg(feature = "quic")]
use atm0s_reverse_proxy_agent::QuicConnection;
#[cfg(feature = "tcp")]
use atm0s_reverse_proxy_agent::TcpConnection;
use atm0s_reverse_proxy_agent::{run_tunnel_connection, Connection, Protocol, ServiceRegistry, SimpleServiceRegistry, SubConnection};
#[cfg(feature = "quic")]
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
use clap::Parser;
use log::LevelFilter;
use picolog::PicoLogger;
#[cfg(feature = "quic")]
use protocol::DEFAULT_TUNNEL_CERT;
use protocol_ed25519::AgentLocalKey;
#[cfg(feature = "quic")]
use rustls::pki_types::CertificateDer;
use tokio::time::sleep;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use url::Url;

/// A benchmark util for simulating multiple clients connect to relay server
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
#[derive(FromArgs, Debug, Clone)]
struct Args {
/// Address of relay server
#[arg(env, long)]
/// address of relay server
#[argh(option)]
connector_addr: Url,

/// Protocol of relay server
#[arg(env, long)]
/// protocol of relay server
#[argh(option)]
connector_protocol: Protocol,

/// Http proxy dest
#[arg(env, long, default_value = "127.0.0.1:8080")]
/// http proxy dest
#[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:8080\").unwrap()")]
http_dest: SocketAddr,

/// Sni-https proxy dest
#[arg(env, long, default_value = "127.0.0.1:8443")]
/// sni-https proxy dest
#[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:8443\").unwrap()")]
https_dest: SocketAddr,

/// Custom quic server cert in base64
#[arg(env, long)]
#[cfg(feature = "quic")]
/// custom quic server cert in base64
#[argh(option)]
custom_quic_cert_base64: Option<String>,

/// Allow connect in insecure mode
#[arg(env, long)]
#[cfg(feature = "quic")]
/// allow connect in insecure mode
#[argh(option)]
allow_quic_insecure: bool,

/// clients
#[arg(env, long)]
#[argh(option)]
clients: usize,

/// wait time between connect action
#[arg(env, long, default_value_t = 1000)]
#[argh(option, default = "1000")]
connect_wait_ms: u64,
}

#[tokio::main]
async fn main() {
let args = Args::parse();
let args: Args = argh::from_env();

#[cfg(feature = "quic")]
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

//if RUST_LOG env is not set, set it to info
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "warn");
}
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();
let level = match std::env::var("RUST_LOG") {
Ok(v) => LevelFilter::from_str(&v).unwrap_or(LevelFilter::Info),
_ => LevelFilter::Info,
};
PicoLogger::new(level).init();

let registry = SimpleServiceRegistry::new(args.http_dest, args.https_dest);
let registry = Arc::new(registry);
Expand Down
16 changes: 14 additions & 2 deletions bin/agent/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Tunnel is a trait that defines the interface for a tunnel which connect to connector port of relayer.
use std::fmt::Debug;
use std::{fmt::Debug, str::FromStr};

use protocol::stream::TunnelStream;
use tokio::io::{AsyncRead, AsyncWrite};
Expand All @@ -12,14 +12,26 @@ pub mod quic;
pub mod tcp;

#[derive(Debug, Clone)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
pub enum Protocol {
#[cfg(feature = "tcp")]
Tcp,
#[cfg(feature = "quic")]
Quic,
}

impl FromStr for Protocol {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
#[cfg(feature = "tcp")]
"tcp" | "TCP" => Ok(Protocol::Tcp),
#[cfg(feature = "quic")]
"quic" | "QUIC" => Ok(Protocol::Quic),
_ => Err("invalid protocol"),
}
}
}

pub trait SubConnection: AsyncRead + AsyncWrite + Unpin + Send + Sync {}

impl<R: AsyncRead + Unpin + Send + Sync, W: AsyncWrite + Unpin + Send + Sync> SubConnection for TunnelStream<R, W> {}
Expand Down
60 changes: 33 additions & 27 deletions bin/agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::str::FromStr;
use std::{alloc::System, net::SocketAddr, sync::Arc};

use log::LevelFilter;
use picolog::PicoLogger;

#[cfg(feature = "quic")]
use atm0s_reverse_proxy_agent::QuicConnection;
#[cfg(feature = "tcp")]
Expand All @@ -8,69 +12,71 @@ use atm0s_reverse_proxy_agent::{run_tunnel_connection, Connection, Protocol, Ser
#[cfg(feature = "quic")]
use base64::{engine::general_purpose::URL_SAFE, Engine as _};

use clap::Parser;
use argh::FromArgs;
use protocol::services::SERVICE_RTSP;
#[cfg(feature = "quic")]
use protocol::DEFAULT_TUNNEL_CERT;
use protocol_ed25519::AgentLocalKey;
#[cfg(feature = "quic")]
use rustls::pki_types::CertificateDer;
use tokio::time::sleep;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
// use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use url::Url;

#[global_allocator]
static A: System = System;

/// A HTTP and SNI HTTPs proxy for expose your local service to the internet.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
#[derive(FromArgs, Debug)]
struct Args {
/// Address of relay server
#[arg(env, long)]
/// address of relay server
#[argh(option)]
connector_addr: Url,

/// Protocol of relay server
#[arg(env, long)]
/// protocol of relay server
#[argh(option)]
connector_protocol: Protocol,

/// Http proxy dest
#[arg(env, long, default_value = "127.0.0.1:8080")]
/// http proxy dest
#[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:8080\").unwrap()")]
http_dest: SocketAddr,

/// Sni-https proxy dest
#[arg(env, long, default_value = "127.0.0.1:8443")]
/// sni-https proxy dest
#[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:8443\").unwrap()")]
https_dest: SocketAddr,

/// Rtsp proxy dest
#[arg(env, long, default_value = "127.0.0.1:554")]
/// rtsp proxy dest
#[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:554\").unwrap()")]
rtsp_dest: SocketAddr,

/// Sni-https proxy dest
#[arg(env, long, default_value = "127.0.0.1:5443")]
/// sni-https proxy dest
#[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:5443\").unwrap()")]
rtsps_dest: SocketAddr,

/// Persistent local key
#[arg(env, long, default_value = "local_key.pem")]
/// persistent local key
#[argh(option, default = "String::from(\"local_key.pem\")")]
local_key: String,

/// Custom quic server cert in base64
#[arg(env, long)]
#[cfg(feature = "quic")]
/// custom quic server cert in base64
#[argh(option)]
custom_quic_cert_base64: Option<String>,

/// Allow connect in insecure mode
#[arg(env, long)]
#[cfg(feature = "quic")]
/// allow connect in insecure mode
#[argh(switch)]
allow_quic_insecure: bool,
}

#[tokio::main]
async fn main() {
let args = Args::parse();
let args: Args = argh::from_env();
//if RUST_LOG env is not set, set it to info
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init();
let level = match std::env::var("RUST_LOG") {
Ok(v) => LevelFilter::from_str(&v).unwrap_or(LevelFilter::Info),
_ => LevelFilter::Info,
};
PicoLogger::new(level).init();

#[cfg(feature = "quic")]
let server_certs = if let Some(cert) = args.custom_quic_cert_base64 {
Expand Down

0 comments on commit e6047bd

Please sign in to comment.