Skip to content

Commit

Permalink
fix: quic_listener will stuck if have huge of waiting incoming conns …
Browse files Browse the repository at this point in the history
…and cause timeout (#31)
  • Loading branch information
giangndm authored Jun 10, 2024
1 parent b2ff5ce commit 92c2a7e
Showing 1 changed file with 54 additions and 55 deletions.
109 changes: 54 additions & 55 deletions crates/relayer/src/agent_listener/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,74 @@ use std::{
error::Error, fmt::Debug, marker::PhantomData, net::SocketAddr, sync::Arc, time::Duration,
};

use async_std::channel::Receiver;
use protocol::key::ClusterValidator;
use quinn::{Endpoint, RecvStream, SendStream, ServerConfig};
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use serde::de::DeserializeOwned;

use super::{AgentConnection, AgentListener, AgentSubConnection};

pub struct AgentQuicListener<VALIDATE: ClusterValidator<REQ>, REQ: DeserializeOwned + Debug> {
endpoint: Endpoint,
cluster_validator: VALIDATE,
pub struct AgentQuicListener<REQ: DeserializeOwned + Debug> {
rx: Receiver<AgentQuicConnection>,
_tmp: PhantomData<REQ>,
}

impl<VALIDATE: ClusterValidator<REQ>, REQ: DeserializeOwned + Debug>
AgentQuicListener<VALIDATE, REQ>
{
pub async fn new(
impl<REQ: DeserializeOwned + Debug> AgentQuicListener<REQ> {
pub async fn new<VALIDATE: 'static + ClusterValidator<REQ>>(
addr: SocketAddr,
cluster_validator: VALIDATE,
priv_key: PrivatePkcs8KeyDer<'static>,
cert: CertificateDer<'static>,
) -> Self {
log::info!("AgentQuicListener::new {}", addr);
let endpoint =
make_server_endpoint(addr, priv_key, cert).expect("Should make server endpoint");
let (tx, rx) = async_std::channel::bounded(100);
async_std::task::spawn_local(async move {
let endpoint =
make_server_endpoint(addr, priv_key, cert).expect("Should make server endpoint");
let cluster_validator = Arc::new(cluster_validator);
while let Some(incoming_conn) = endpoint.accept().await {
let cluster_validator = cluster_validator.clone();
let tx = tx.clone();
async_std::task::spawn_local(async move {
log::info!(
"[AgentQuicListener] On incoming from {}",
incoming_conn.remote_address()
);
let conn: quinn::Connection = match incoming_conn.await {
Ok(conn) => conn,
Err(e) => {
log::error!("[AgentQuicListener] incomming conn error {}", e);
return;
}
};
log::info!(
"[AgentQuicListener] new conn from {}",
conn.remote_address()
);
match Self::process_incoming_conn(cluster_validator, conn).await {
Ok(connection) => {
log::info!("new connection {}", connection.domain());
if let Err(e) = tx.send(connection).await {
log::error!("send new connection to main loop error {:?}", e);
}
}
Err(e) => {
log::error!("process_incoming_conn error: {}", e);
}
}
});
}
});

Self {
endpoint,
cluster_validator,
rx,
_tmp: Default::default(),
}
}

async fn process_incoming_conn(
&self,
async fn process_incoming_conn<VALIDATE: ClusterValidator<REQ>>(
cluster_validator: Arc<VALIDATE>,
conn: quinn::Connection,
) -> Result<AgentQuicConnection, Box<dyn Error>> {
let (mut send, mut recv) = conn.accept_bi().await?;
Expand All @@ -46,19 +79,17 @@ impl<VALIDATE: ClusterValidator<REQ>, REQ: DeserializeOwned + Debug>
.await?
.ok_or::<Box<dyn Error>>("No incomming data".into())?;

match self.cluster_validator.validate_connect_req(&buf[..buf_len]) {
Ok(request) => match self.cluster_validator.generate_domain(&request) {
match cluster_validator.validate_connect_req(&buf[..buf_len]) {
Ok(request) => match cluster_validator.generate_domain(&request) {
Ok(domain) => {
log::info!("register request domain {}", domain);
let res_buf = self.cluster_validator.sign_response_res(&request, None);
let res_buf = cluster_validator.sign_response_res(&request, None);
send.write_all(&res_buf).await?;
Ok(AgentQuicConnection { domain, conn })
}
Err(e) => {
log::error!("invalid register request {:?}, error {}", request, e);
let res_buf = self
.cluster_validator
.sign_response_res(&request, Some(e.clone()));
let res_buf = cluster_validator.sign_response_res(&request, Some(e.clone()));
send.write_all(&res_buf).await?;
Err(e.into())
}
Expand All @@ -72,44 +103,12 @@ impl<VALIDATE: ClusterValidator<REQ>, REQ: DeserializeOwned + Debug>
}

#[async_trait::async_trait]
impl<
VALIDATE: ClusterValidator<REQ> + Send + Sync,
REQ: DeserializeOwned + Send + Sync + Debug,
> AgentListener<AgentQuicConnection, AgentQuicSubConnection, RecvStream, SendStream>
for AgentQuicListener<VALIDATE, REQ>
impl<REQ: DeserializeOwned + Send + Sync + Debug>
AgentListener<AgentQuicConnection, AgentQuicSubConnection, RecvStream, SendStream>
for AgentQuicListener<REQ>
{
async fn recv(&mut self) -> Result<AgentQuicConnection, Box<dyn Error>> {
loop {
let incoming_conn = self
.endpoint
.accept()
.await
.ok_or::<Box<dyn Error>>("Cannot accept".into())?;
log::info!(
"[AgentQuicListener] On incoming from {}",
incoming_conn.remote_address()
);
let conn: quinn::Connection = match incoming_conn.await {
Ok(conn) => conn,
Err(e) => {
log::error!("[AgentQuicListener] incomming conn error {}", e);
continue;
}
};
log::info!(
"[AgentQuicListener] new conn from {}",
conn.remote_address()
);
match self.process_incoming_conn(conn).await {
Ok(connection) => {
log::info!("new connection {}", connection.domain());
return Ok(connection);
}
Err(e) => {
log::error!("process_incoming_conn error: {}", e);
}
}
}
self.rx.recv().await.map_err(|e| e.into())
}
}

Expand Down

0 comments on commit 92c2a7e

Please sign in to comment.