diff --git a/Cargo.toml b/Cargo.toml index 1d49bba..22840aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,14 @@ protocol-ed25519 = { path = "crates/protocol_ed25519", package = "atm0s-reverse- log = "0.4" tokio-yamux = "0.3" clap = "4.4" +argh = "=0.1.13" # small cli async-trait = "0.1" tokio = "1" httparse = "1.8" tls-parser = "0.12" rtsp-types = "0.1" tracing-subscriber = "0.3" +picolog = "1.0" atm0s-sdn = "0.2" serde = "1.0" bincode = "1.3" diff --git a/bin/agent/Cargo.toml b/bin/agent/Cargo.toml index e958e31..d41bc65 100644 --- a/bin/agent/Cargo.toml +++ b/bin/agent/Cargo.toml @@ -13,19 +13,27 @@ protocol-ed25519 = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } futures = { version = "0.3" } async-trait = { workspace = true } -clap = { workspace = true, features = ["derive", "env"] } log = { workspace = true } -tracing-subscriber = { workspace = true, features = ["env-filter", "std"], optional = true } -tokio-yamux = { workspace = true } bincode = { workspace = true } serde = { workspace = true, features = ["derive"] } -quinn = { workspace = true, features = ["ring", "runtime-tokio", "futures-io"] } -rustls = { workspace = true, features = ["ring", "std"] } url = { workspace = true } -base64 = { workspace = true } -thiserror = { workspace = true } anyhow = { workspace = true } +thiserror = { workspace = true, optional = true } + +# for binary build +picolog = { workspace = true, optional = true } +argh = { workspace = true, optional = true } + +# for tcp protocol +tokio-yamux = { workspace = true, optional = true } + +# for quic protocol +quinn = { workspace = true, features = ["ring", "runtime-tokio", "futures-io"], optional = true } +rustls = { workspace = true, features = ["ring", "std"], optional = true } +base64 = { workspace = true, optional = true } [features] -default = ["binary"] -binary = ["protocol-ed25519", "tracing-subscriber"] +default = ["binary", "tcp"] +binary = ["protocol-ed25519", "argh", "picolog"] +tcp = ["tokio-yamux"] +quic = ["quinn", "rustls", "base64", "thiserror"] \ No newline at end of file diff --git a/bin/agent/examples/benchmark_clients.rs b/bin/agent/examples/benchmark_clients.rs index 1c9caa0..ec9b685 100644 --- a/bin/agent/examples/benchmark_clients.rs +++ b/bin/agent/examples/benchmark_clients.rs @@ -1,69 +1,79 @@ +use std::str::FromStr; use std::{ net::SocketAddr, sync::Arc, time::{Duration, Instant}, }; -use atm0s_reverse_proxy_agent::{run_tunnel_connection, Connection, Protocol, QuicConnection, ServiceRegistry, SimpleServiceRegistry, SubConnection, TcpConnection}; +use argh::FromArgs; +#[cfg(feature = "quic")] +use atm0s_reverse_proxy_agent::QuicConnection; +#[cfg(feature = "tcp")] +use atm0s_reverse_proxy_agent::TcpConnection; +use atm0s_reverse_proxy_agent::{run_tunnel_connection, Connection, Protocol, ServiceRegistry, SimpleServiceRegistry, SubConnection}; +#[cfg(feature = "quic")] use base64::{engine::general_purpose::URL_SAFE, Engine as _}; -use clap::Parser; +use log::LevelFilter; +use picolog::PicoLogger; +#[cfg(feature = "quic")] use protocol::DEFAULT_TUNNEL_CERT; use protocol_ed25519::AgentLocalKey; +#[cfg(feature = "quic")] use rustls::pki_types::CertificateDer; use tokio::time::sleep; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use url::Url; /// A benchmark util for simulating multiple clients connect to relay server -#[derive(Parser, Debug, Clone)] -#[command(author, version, about, long_about = None)] +#[derive(FromArgs, Debug, Clone)] struct Args { - /// Address of relay server - #[arg(env, long)] + /// address of relay server + #[argh(option)] connector_addr: Url, - /// Protocol of relay server - #[arg(env, long)] + /// protocol of relay server + #[argh(option)] connector_protocol: Protocol, - /// Http proxy dest - #[arg(env, long, default_value = "127.0.0.1:8080")] + /// http proxy dest + #[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:8080\").unwrap()")] http_dest: SocketAddr, - /// Sni-https proxy dest - #[arg(env, long, default_value = "127.0.0.1:8443")] + /// sni-https proxy dest + #[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:8443\").unwrap()")] https_dest: SocketAddr, - /// Custom quic server cert in base64 - #[arg(env, long)] + #[cfg(feature = "quic")] + /// custom quic server cert in base64 + #[argh(option)] custom_quic_cert_base64: Option, - /// Allow connect in insecure mode - #[arg(env, long)] + #[cfg(feature = "quic")] + /// allow connect in insecure mode + #[argh(option)] allow_quic_insecure: bool, /// clients - #[arg(env, long)] + #[argh(option)] clients: usize, /// wait time between connect action - #[arg(env, long, default_value_t = 1000)] + #[argh(option, default = "1000")] connect_wait_ms: u64, } #[tokio::main] async fn main() { - let args = Args::parse(); + let args: Args = argh::from_env(); + + #[cfg(feature = "quic")] rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); //if RUST_LOG env is not set, set it to info - if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "warn"); - } - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer()) - .with(tracing_subscriber::EnvFilter::from_default_env()) - .init(); + let level = match std::env::var("RUST_LOG") { + Ok(v) => LevelFilter::from_str(&v).unwrap_or(LevelFilter::Info), + _ => LevelFilter::Info, + }; + PicoLogger::new(level).init(); let registry = SimpleServiceRegistry::new(args.http_dest, args.https_dest); let registry = Arc::new(registry); @@ -83,8 +93,10 @@ async fn main() { } async fn connect(client: usize, args: Args, registry: Arc) { + #[cfg(feature = "quic")] let default_tunnel_cert = CertificateDer::from(DEFAULT_TUNNEL_CERT.to_vec()); + #[cfg(feature = "quic")] let server_certs = if let Some(cert) = args.custom_quic_cert_base64 { vec![CertificateDer::from(URL_SAFE.decode(&cert).expect("Custom cert should in base64 format").to_vec())] } else { @@ -96,6 +108,7 @@ async fn connect(client: usize, args: Args, registry: Arc) log::info!("Connecting to connector... {:?} addr: {}", args.connector_protocol, args.connector_addr); let started = Instant::now(); match args.connector_protocol { + #[cfg(feature = "tcp")] Protocol::Tcp => match TcpConnection::new(args.connector_addr.clone(), &agent_signer).await { Ok(conn) => { log::info!("Connected to connector via tcp with res {:?}", conn.response()); @@ -106,6 +119,7 @@ async fn connect(client: usize, args: Args, registry: Arc) log::error!("Connect to connector via tcp error: {e}"); } }, + #[cfg(feature = "quic")] Protocol::Quic => match QuicConnection::new(args.connector_addr.clone(), &agent_signer, &server_certs, args.allow_quic_insecure).await { Ok(conn) => { log::info!("Connected to connector via quic with res {:?}", conn.response()); diff --git a/bin/agent/src/connection.rs b/bin/agent/src/connection.rs index d6b293d..7b79619 100644 --- a/bin/agent/src/connection.rs +++ b/bin/agent/src/connection.rs @@ -1,20 +1,37 @@ //! Tunnel is a trait that defines the interface for a tunnel which connect to connector port of relayer. -use std::fmt::Debug; +use std::{fmt::Debug, str::FromStr}; -use clap::ValueEnum; use protocol::stream::TunnelStream; use tokio::io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "quic")] pub mod quic; + +#[cfg(feature = "tcp")] pub mod tcp; -#[derive(ValueEnum, Debug, Clone)] +#[derive(Debug, Clone)] pub enum Protocol { + #[cfg(feature = "tcp")] Tcp, + #[cfg(feature = "quic")] Quic, } +impl FromStr for Protocol { + type Err = &'static str; + fn from_str(s: &str) -> Result { + match s { + #[cfg(feature = "tcp")] + "tcp" | "TCP" => Ok(Protocol::Tcp), + #[cfg(feature = "quic")] + "quic" | "QUIC" => Ok(Protocol::Quic), + _ => Err("invalid protocol"), + } + } +} + pub trait SubConnection: AsyncRead + AsyncWrite + Unpin + Send + Sync {} impl SubConnection for TunnelStream {} diff --git a/bin/agent/src/lib.rs b/bin/agent/src/lib.rs index 78e099f..b0fe2d7 100644 --- a/bin/agent/src/lib.rs +++ b/bin/agent/src/lib.rs @@ -4,12 +4,12 @@ use protocol::cluster::{wait_object, AgentTunnelRequest}; mod connection; mod local_tunnel; +#[cfg(feature = "quic")] +pub use connection::quic::{QuicConnection, QuicSubConnection}; +#[cfg(feature = "tcp")] +pub use connection::tcp::{TcpConnection, TcpSubConnection}; -pub use connection::{ - quic::{QuicConnection, QuicSubConnection}, - tcp::{TcpConnection, TcpSubConnection}, - Connection, Protocol, SubConnection, -}; +pub use connection::{Connection, Protocol, SubConnection}; pub use local_tunnel::{registry::SimpleServiceRegistry, LocalTunnel, ServiceRegistry}; use tokio::{io::copy_bidirectional, net::TcpStream}; diff --git a/bin/agent/src/main.rs b/bin/agent/src/main.rs index 7ea759e..af7c4b3 100644 --- a/bin/agent/src/main.rs +++ b/bin/agent/src/main.rs @@ -1,77 +1,93 @@ +use std::str::FromStr; use std::{alloc::System, net::SocketAddr, sync::Arc}; -use atm0s_reverse_proxy_agent::{run_tunnel_connection, Connection, Protocol, QuicConnection, ServiceRegistry, SimpleServiceRegistry, SubConnection, TcpConnection}; +use log::LevelFilter; +use picolog::PicoLogger; + +#[cfg(feature = "quic")] +use atm0s_reverse_proxy_agent::QuicConnection; +#[cfg(feature = "tcp")] +use atm0s_reverse_proxy_agent::TcpConnection; +use atm0s_reverse_proxy_agent::{run_tunnel_connection, Connection, Protocol, ServiceRegistry, SimpleServiceRegistry, SubConnection}; +#[cfg(feature = "quic")] use base64::{engine::general_purpose::URL_SAFE, Engine as _}; -use clap::Parser; -use protocol::{services::SERVICE_RTSP, DEFAULT_TUNNEL_CERT}; + +use argh::FromArgs; +use protocol::services::SERVICE_RTSP; +#[cfg(feature = "quic")] +use protocol::DEFAULT_TUNNEL_CERT; use protocol_ed25519::AgentLocalKey; +#[cfg(feature = "quic")] use rustls::pki_types::CertificateDer; use tokio::time::sleep; -use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; +// use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; use url::Url; #[global_allocator] static A: System = System; /// A HTTP and SNI HTTPs proxy for expose your local service to the internet. -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] +#[derive(FromArgs, Debug)] struct Args { - /// Address of relay server - #[arg(env, long)] + /// address of relay server + #[argh(option)] connector_addr: Url, - /// Protocol of relay server - #[arg(env, long)] + /// protocol of relay server + #[argh(option)] connector_protocol: Protocol, - /// Http proxy dest - #[arg(env, long, default_value = "127.0.0.1:8080")] + /// http proxy dest + #[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:8080\").unwrap()")] http_dest: SocketAddr, - /// Sni-https proxy dest - #[arg(env, long, default_value = "127.0.0.1:8443")] + /// sni-https proxy dest + #[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:8443\").unwrap()")] https_dest: SocketAddr, - /// Rtsp proxy dest - #[arg(env, long, default_value = "127.0.0.1:554")] + /// rtsp proxy dest + #[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:554\").unwrap()")] rtsp_dest: SocketAddr, - /// Sni-https proxy dest - #[arg(env, long, default_value = "127.0.0.1:5443")] + /// sni-https proxy dest + #[argh(option, default = "SocketAddr::from_str(\"127.0.0.1:5443\").unwrap()")] rtsps_dest: SocketAddr, - /// Persistent local key - #[arg(env, long, default_value = "local_key.pem")] + /// persistent local key + #[argh(option, default = "String::from(\"local_key.pem\")")] local_key: String, - /// Custom quic server cert in base64 - #[arg(env, long)] + #[cfg(feature = "quic")] + /// custom quic server cert in base64 + #[argh(option)] custom_quic_cert_base64: Option, - /// Allow connect in insecure mode - #[arg(env, long)] + #[cfg(feature = "quic")] + /// allow connect in insecure mode + #[argh(switch)] allow_quic_insecure: bool, } #[tokio::main] async fn main() { - let args = Args::parse(); + let args: Args = argh::from_env(); + //if RUST_LOG env is not set, set it to info + let level = match std::env::var("RUST_LOG") { + Ok(v) => LevelFilter::from_str(&v).unwrap_or(LevelFilter::Info), + _ => LevelFilter::Info, + }; + PicoLogger::new(level).init(); + #[cfg(feature = "quic")] let server_certs = if let Some(cert) = args.custom_quic_cert_base64 { vec![CertificateDer::from(URL_SAFE.decode(cert).expect("Custom cert should in base64 format").to_vec())] } else { vec![CertificateDer::from(DEFAULT_TUNNEL_CERT.to_vec())] }; + #[cfg(feature = "quic")] rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); - //if RUST_LOG env is not set, set it to info - if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "info"); - } - tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init(); - //read local_key from file first, if not exist, create a new one and save to file let agent_signer = match std::fs::read_to_string(&args.local_key) { Ok(local_key) => match AgentLocalKey::from_pem(&local_key) { @@ -110,6 +126,7 @@ async fn main() { loop { log::info!("Connecting to connector... {:?} addr: {}", args.connector_protocol, args.connector_addr); match args.connector_protocol { + #[cfg(feature = "tcp")] Protocol::Tcp => match TcpConnection::new(args.connector_addr.clone(), &agent_signer).await { Ok(conn) => { log::info!("Connected to connector via tcp with res {:?}", conn.response()); @@ -119,6 +136,7 @@ async fn main() { log::error!("Connect to connector via tcp error: {e}"); } }, + #[cfg(feature = "quic")] Protocol::Quic => match QuicConnection::new(args.connector_addr.clone(), &agent_signer, &server_certs, args.allow_quic_insecure).await { Ok(conn) => { log::info!("Connected to connector via quic with res {:?}", conn.response());