Skip to content

Commit

Permalink
fix: increase agent quic keep alive for reduce server load, added ben…
Browse files Browse the repository at this point in the history
…chmark clients sample (#30)
  • Loading branch information
giangndm authored Jun 8, 2024
1 parent c1a320b commit b2ff5ce
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 38 deletions.
7 changes: 7 additions & 0 deletions crates/agent/benchmark_quic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cargo run --release --example benchmark_clients -- \
--connector-protocol quic \
--connector-addr https://127.0.0.1:13001 \
--http-dest 127.0.0.1:8080 \
--https-dest 127.0.0.1:8443 \
--allow-quic-insecure \
$@
7 changes: 7 additions & 0 deletions crates/agent/benchmark_tcp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cargo run --example benchmark_clients -- \
--connector-protocol tcp \
--connector-addr tcp://local.ha.8xff.io:13001 \
--http-dest 127.0.0.1:8080 \
--https-dest 127.0.0.1:8443 \
--allow-quic-insecure \
$@
175 changes: 175 additions & 0 deletions crates/agent/examples/benchmark_clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::{
net::SocketAddr,
time::{Duration, Instant},
};

use atm0s_reverse_proxy_agent::{
run_tunnel_connection, Connection, Protocol, QuicConnection, SubConnection, TcpConnection,
};
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
use clap::Parser;
use futures::{AsyncRead, AsyncWrite};
use protocol_ed25519::AgentLocalKey;
use rustls::pki_types::CertificateDer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use url::Url;

/// A benchmark util for simulating multiple clients connect to relay server
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Address of relay server
#[arg(env, long)]
connector_addr: Url,

/// Protocol of relay server
#[arg(env, long)]
connector_protocol: Protocol,

/// Http proxy dest
#[arg(env, long, default_value = "127.0.0.1:8080")]
http_dest: SocketAddr,

/// Sni-https proxy dest
#[arg(env, long, default_value = "127.0.0.1:8443")]
https_dest: SocketAddr,

/// Custom quic server cert in base64
#[arg(env, long)]
custom_quic_cert_base64: Option<String>,

/// Allow connect in insecure mode
#[arg(env, long)]
allow_quic_insecure: bool,

/// clients
#[arg(env, long)]
clients: usize,

/// wait time between connect action
#[arg(env, long, default_value_t = 1000)]
connect_wait_ms: u64,
}

#[async_std::main]
async fn main() {
let args = Args::parse();
rustls::crypto::ring::default_provider()
.install_default()
.expect("should install ring as default");

//if RUST_LOG env is not set, set it to info
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "warn");
}
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

for client in 0..args.clients {
let args2 = args.clone();
async_std::task::spawn(async move {
async_std::task::spawn_local(connect(client, args2));
});
async_std::task::sleep(Duration::from_millis(args.connect_wait_ms)).await;
}

loop {
async_std::task::sleep(Duration::from_millis(1000)).await;
}
}

async fn connect(client: usize, args: Args) {
let default_tunnel_cert_buf = include_bytes!("../../../certs/tunnel.cert");
let default_tunnel_cert = CertificateDer::from(default_tunnel_cert_buf.to_vec());

let server_certs = if let Some(cert) = args.custom_quic_cert_base64 {
vec![CertificateDer::from(
URL_SAFE
.decode(&cert)
.expect("Custom cert should in base64 format")
.to_vec(),
)]
} else {
vec![default_tunnel_cert]
};
let agent_signer = AgentLocalKey::random();

loop {
log::info!(
"Connecting to connector... {:?} addr: {}",
args.connector_protocol,
args.connector_addr
);
let started = Instant::now();
match args.connector_protocol {
Protocol::Tcp => {
match TcpConnection::new(args.connector_addr.clone(), &agent_signer).await {
Ok(conn) => {
log::info!(
"Connected to connector via tcp with res {:?}",
conn.response()
);
println!("{client} connected after {:?}", started.elapsed());
run_connection_loop(conn, args.http_dest, args.https_dest).await;
}
Err(e) => {
log::error!("Connect to connector via tcp error: {}", e);
}
}
}
Protocol::Quic => {
match QuicConnection::new(
args.connector_addr.clone(),
&agent_signer,
&server_certs,
args.allow_quic_insecure,
)
.await
{
Ok(conn) => {
log::info!(
"Connected to connector via quic with res {:?}",
conn.response()
);
println!("{client} connected after {:?}", started.elapsed());
run_connection_loop(conn, args.http_dest, args.https_dest).await;
}
Err(e) => {
log::error!("Connect to connector via quic error: {}", e);
}
}
}
}
//TODO exponential backoff
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
}
}

async fn run_connection_loop<S, R, W>(
mut connection: impl Connection<S, R, W>,
http_dest: SocketAddr,
https_dest: SocketAddr,
) where
S: SubConnection<R, W> + 'static,
R: AsyncRead + Send + Unpin + 'static,
W: AsyncWrite + Send + Unpin + 'static,
{
loop {
match connection.recv().await {
Ok(sub_connection) => {
log::info!("recv sub_connection");
async_std::task::spawn_local(run_tunnel_connection(
sub_connection,
http_dest,
https_dest,
));
}
Err(e) => {
log::error!("recv sub_connection error: {}", e);
break;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RUST_LOG=debug cargo run -- \
RUST_LOG=info cargo run -- \
--connector-protocol quic \
--connector-addr https://local.ha.8xff.io:13001 \
--connector-addr https://127.0.0.1:13001 \
--http-dest 127.0.0.1:8080 \
--https-dest 127.0.0.1:8443 \
--allow-quic-insecure
6 changes: 6 additions & 0 deletions crates/agent/node_local_tcp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
RUST_LOG=info cargo run -- \
--connector-protocol tcp \
--connector-addr tcp://127.0.0.1:13001 \
--http-dest 127.0.0.1:8080 \
--https-dest 127.0.0.1:8443 \
--allow-quic-insecure
29 changes: 17 additions & 12 deletions crates/agent/src/connection/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ fn configure_client(
};

let mut transport = TransportConfig::default();
transport.keep_alive_interval(Some(Duration::from_secs(3)));
transport.keep_alive_interval(Some(Duration::from_secs(15)));
transport.max_idle_timeout(Some(
Duration::from_secs(30)
.try_into()
.expect("Should config timeout"),
));
config.transport_config(Arc::new(transport));
Ok(config)
}
Expand All @@ -128,18 +133,18 @@ impl SkipServerVerification {
impl ServerCertVerifier for SkipServerVerification {
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}

fn verify_tls13_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
Expand All @@ -150,11 +155,11 @@ impl ServerCertVerifier for SkipServerVerification {

fn verify_server_cert(
&self,
end_entity: &CertificateDer<'_>,
intermediates: &[CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
Expand Down
32 changes: 30 additions & 2 deletions crates/agent/src/connection/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
error::Error,
net::{SocketAddr, ToSocketAddrs},
net::ToSocketAddrs,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -74,7 +74,15 @@ impl<RES: Send + Sync>
for TcpConnection<RES>
{
async fn create_outgoing(&mut self) -> Result<TcpSubConnection, Box<dyn Error>> {
todo!()
// TODO fix create_sub_connection issue. I don't know why yamux is success full create at agent
// but relay don't received any connection
let mux_client = OpenStreamsClient {
connection: &mut self.conn,
};
match mux_client.await {
Ok(stream) => Ok(TcpSubConnection::new(stream)),
Err(e) => Err(e.into()),
}
}

async fn recv(&mut self) -> Result<TcpSubConnection, Box<dyn Error>> {
Expand Down Expand Up @@ -115,3 +123,23 @@ where
}
}
}

#[derive(Debug)]
pub struct OpenStreamsClient<'a, T> {
connection: &'a mut yamux::Connection<T>,
}

impl<'a, T> Future for OpenStreamsClient<'a, T>
where
T: AsyncRead + AsyncWrite + Unpin + std::fmt::Debug,
{
type Output = yamux::Result<yamux::Stream>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this.connection.poll_new_outbound(cx) {
Poll::Ready(stream) => return Poll::Ready(stream),
Poll::Pending => Poll::Pending,
}
}
}
2 changes: 1 addition & 1 deletion crates/relayer/run_local_node2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ cargo run -- \
--root-domain local.ha.8xff.io \
--sdn-node-id 2 \
--sdn-port 50002 \
--sdn-seeds '1@/ip4/127.0.0.1/udp/50001'
--sdn-seeds '1@/ip4/127.0.0.1/udp/50001'
9 changes: 8 additions & 1 deletion crates/relayer/src/agent_listener/quic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{error::Error, fmt::Debug, marker::PhantomData, net::SocketAddr, sync::Arc};
use std::{
error::Error, fmt::Debug, marker::PhantomData, net::SocketAddr, sync::Arc, time::Duration,
};

use protocol::key::ClusterValidator;
use quinn::{Endpoint, RecvStream, SendStream, ServerConfig};
Expand Down Expand Up @@ -164,6 +166,11 @@ fn configure_server(
let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key.into())?;
let transport_config = Arc::get_mut(&mut server_config.transport).unwrap();
transport_config.max_concurrent_uni_streams(0_u8.into());
transport_config.max_idle_timeout(Some(
Duration::from_secs(30)
.try_into()
.expect("Should config timeout"),
));

Ok(server_config)
}
Loading

0 comments on commit b2ff5ce

Please sign in to comment.