Skip to content

Commit

Permalink
ensure tcp connections always be closed with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
neevek committed May 19, 2024
1 parent 131d32d commit f132b34
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 27 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 36 additions & 25 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ impl Server {
let nread =
tokio::time::timeout(Duration::from_secs(2), inbound_stream.read(&mut buffer))
.await
.map_err(|_| ProxyError::BadRequest)?
.map_err(|_| ProxyError::Timeout)?;
.map_err(|_: Elapsed| ProxyError::Timeout)?
.map_err(|_| ProxyError::BadRequest)?;

if proxy_handler.is_none() {
proxy_handler = Some(Self::create_proxy_handler(
Expand Down Expand Up @@ -535,7 +535,7 @@ impl Server {
};

debug!(
"forward payload to next proxy({outbound_type:?}), {addr} -> {}",
"forward to {outbound_type:?}, {addr} {}",
upstream.unwrap()
);

Expand Down Expand Up @@ -629,8 +629,8 @@ impl Server {

None => {
warn!(
"failed to create outbound stream for: {addr} from {}",
inbound_stream.peer_addr().unwrap()
"failed to create outbound stream for: {addr} from {:?}",
inbound_stream.peer_addr()
);
proxy_handler
.handle_outbound_failure(&mut inbound_stream)
Expand All @@ -650,13 +650,13 @@ impl Server {
) -> Result<Option<TcpStream>, ProxyError> {
match dashboard_addr {
Some(addr) => {
debug!("dashboard request: {}", inbound_stream.peer_addr().unwrap());
debug!("dashboard request: {:?}", inbound_stream.peer_addr());
Ok(Self::create_tcp_stream(addr, tcp_nodelay).await)
}
None => {
log::warn!(
"request routing to the proxy server itself is rejected: {}",
inbound_stream.peer_addr().unwrap()
"request routing to the proxy server itself is rejected: {:?}",
inbound_stream.peer_addr()
);
Err(ProxyError::BadRequest)
}
Expand Down Expand Up @@ -700,9 +700,18 @@ impl Server {
) -> Result<ProxyTraffic, ProxyError> {
stats_sender.send(ServerStats::NewConnection).await.ok();

const BUFFER_SIZE: usize = 4096;
let mut inbound_buffer = [0u8; BUFFER_SIZE];
let mut outbound_buffer = [0u8; BUFFER_SIZE];
let in_addr = inbound_stream
.peer_addr()
.map_err(|_| ProxyError::InternalError)?;
let out_addr = outbound_stream
.peer_addr()
.map_err(|_| ProxyError::InternalError)?;

debug!("sess start: {in_addr:<20} ↔ {out_addr:<20}");

const BUFFER_SIZE: usize = 8192;
let mut inbound_buffer = vec![0; BUFFER_SIZE];
let mut outbound_buffer = vec![0; BUFFER_SIZE];
let (mut inbound_reader, mut inbound_writer) = inbound_stream.split();
let (mut outbound_reader, mut outbound_writer) = outbound_stream.split();

Expand Down Expand Up @@ -735,7 +744,7 @@ impl Server {
&mut inbound_writer,
&mut outbound_buffer,
&mut rx_bytes,
&mut inbound_stream_eos,
&mut outbound_stream_eos,
)
.await
} else {
Expand All @@ -744,30 +753,26 @@ impl Server {
&mut outbound_writer,
&mut inbound_buffer,
&mut tx_bytes,
&mut outbound_stream_eos,
&mut inbound_stream_eos,
)
.await
};

match result {
Err(ProxyError::Timeout) | Ok(0) => {
if outbound_stream_eos || inbound_stream_eos {
Ok(0) => {
if inbound_stream_eos && outbound_stream_eos {
break;
}
}
Err(ProxyError::Timeout) => {
debug!("timeout : {in_addr:<20} ↔ {out_addr:<20} | ⟳ {loop_count:<8}| ↑ {tx_bytes:<10} ↓ {rx_bytes:<10}");
break;
}
Err(_) => break,
Ok(_) => {}
}
}

debug!(
"data [{:<8}] = ↑ {:<10} ↓ {:<10} {} ↔ {}",
loop_count,
tx_bytes,
rx_bytes,
inbound_stream.peer_addr().unwrap(),
outbound_stream.local_addr().unwrap(),
);
debug!("sess end : {in_addr:<20} ↔ {out_addr:<20} | ⟳ {loop_count:<8}| ↑ {tx_bytes:<10} ↓ {rx_bytes:<10}");

stats_sender
.send(ServerStats::Traffic(ProxyTraffic { tx_bytes, rx_bytes }))
Expand All @@ -794,7 +799,13 @@ impl Server {
.map_err(|_: Elapsed| ProxyError::Timeout)?
{
Ok(0) => {
*eos_flag = true;
if !*eos_flag {
*eos_flag = true;
writer
.shutdown()
.await
.map_err(|_| ProxyError::InternalError)?;
}
Ok(0)
}
Ok(n) => {
Expand Down

0 comments on commit f132b34

Please sign in to comment.