Skip to content

Commit

Permalink
support timeout and close fd in time to avoid FIN_WAIT2
Browse files Browse the repository at this point in the history
  • Loading branch information
neevek committed May 17, 2024
1 parent bac2770 commit 131d32d
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "omnip"
version = "0.4.20"
version = "0.4.21"
edition = "2021"

[lib]
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub enum ProxyError {
IPv6NotSupported, // not supported by Socks4
InternalError,
BadRequest,
Timeout,
PayloadTooLarge,
BadGateway(anyhow::Error),
Disconnected(anyhow::Error),
Expand Down Expand Up @@ -559,8 +560,7 @@ pub mod android {
proxy_rules_file,
jthreads as usize,
false,
true,
jtcpNoDelay as bool,
jtcpNoDelay != 0,
) {
Ok(config) => config,
Err(e) => {
Expand Down
182 changes: 141 additions & 41 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
QuicServerConfig,
};
use anyhow::{anyhow, bail, Context, Result};
use log::{debug, error, info};
use log::{debug, error, info, warn};
use notify::event::ModifyKind;
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use rs_utilities::dns::{
Expand All @@ -28,11 +28,14 @@ use std::path::Path;
use std::str::{self, FromStr};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;

use tokio::{
io::AsyncReadExt,
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
};

Expand Down Expand Up @@ -344,7 +347,7 @@ impl Server {

loop {
match proxy_listener.accept().await {
Ok((inbound_stream, addr)) => {
Ok((inbound_stream, _addr)) => {
let psp = psp.clone();
let (prefer_upstream, upstream, dns_resolver) =
copy_inner_state!(self, prefer_upstream, upstream, dns_resolver);
Expand Down Expand Up @@ -373,26 +376,31 @@ impl Server {
.await
.ok();
}
return;
}

Self::process_stream(
inbound_stream,
psp,
upstream,
prefer_upstream,
dns_resolver.unwrap(),
)
.await
.map_err(|e| match e {
ProxyError::BadRequest | ProxyError::BadGateway(_) => {
error!("err: {e:?}");
} else {
match Self::process_stream(
inbound_stream,
psp,
upstream,
prefer_upstream,
dns_resolver.unwrap(),
)
.await
{
Ok(()) => {}
Err(ProxyError::BadRequest) => {
error!("BadRequest");
}
Err(ProxyError::BadGateway(e)) => {
error!("BadGateway: {e:?}");
}
Err(ProxyError::Timeout) => {
error!("Timeout");
}
Err(e) => {
error!("generic error: {e:?}");
}
}
_ => {}
})
.ok();

debug!("connection closed: {addr}");
}
});
}

Expand Down Expand Up @@ -466,7 +474,7 @@ impl Server {
tokio::time::timeout(Duration::from_secs(2), inbound_stream.read(&mut buffer))
.await
.map_err(|_| ProxyError::BadRequest)?
.map_err(|_| ProxyError::BadRequest)?;
.map_err(|_| ProxyError::Timeout)?;

if proxy_handler.is_none() {
proxy_handler = Some(Self::create_proxy_handler(
Expand Down Expand Up @@ -527,8 +535,8 @@ impl Server {
};

debug!(
"forward payload to next proxy({:?}), {addr} -> {:?}",
outbound_type, upstream
"forward payload to next proxy({outbound_type:?}), {addr} -> {}",
upstream.unwrap()
);

outbound_stream =
Expand Down Expand Up @@ -620,6 +628,10 @@ impl Server {
}

None => {
warn!(
"failed to create outbound stream for: {addr} from {}",
inbound_stream.peer_addr().unwrap()
);
proxy_handler
.handle_outbound_failure(&mut inbound_stream)
.await?;
Expand Down Expand Up @@ -682,31 +694,119 @@ impl Server {
}

async fn start_stream_transfer(
mut a_stream: TcpStream,
mut b_stream: TcpStream,
mut inbound_stream: TcpStream,
mut outbound_stream: TcpStream,
stats_sender: &Sender<ServerStats>,
) -> Result<ProxyTraffic, ProxyError> {
stats_sender.send(ServerStats::NewConnection).await.ok();
let result = match tokio::io::copy_bidirectional(&mut a_stream, &mut b_stream).await {
Ok((tx_bytes, rx_bytes)) => {
debug!(
"transfer, out:{tx_bytes}, in:{rx_bytes}, {} <-> {:?}",
a_stream.local_addr().unwrap(),
b_stream.local_addr().unwrap(),
);

stats_sender
.send(ServerStats::Traffic(ProxyTraffic { tx_bytes, rx_bytes }))
.await
.ok();
const BUFFER_SIZE: usize = 4096;
let mut inbound_buffer = [0u8; BUFFER_SIZE];
let mut outbound_buffer = [0u8; BUFFER_SIZE];
let (mut inbound_reader, mut inbound_writer) = inbound_stream.split();
let (mut outbound_reader, mut outbound_writer) = outbound_stream.split();

let mut tx_bytes = 0u64;
let mut rx_bytes = 0u64;
let mut inbound_stream_eos = false;
let mut outbound_stream_eos = false;
let mut loop_count = 0;

loop {
loop_count += 1;
let result = if !inbound_stream_eos && !outbound_stream_eos {
tokio::select! {
result = Self::transfer_data_with_timeout(
&mut inbound_reader,
&mut outbound_writer,
&mut inbound_buffer,
&mut tx_bytes,
&mut inbound_stream_eos) => result,
result = Self::transfer_data_with_timeout(
&mut outbound_reader,
&mut inbound_writer,
&mut outbound_buffer,
&mut rx_bytes,
&mut outbound_stream_eos) => result,
}
} else if !outbound_stream_eos {
Self::transfer_data_with_timeout(
&mut outbound_reader,
&mut inbound_writer,
&mut outbound_buffer,
&mut rx_bytes,
&mut inbound_stream_eos,
)
.await
} else {
Self::transfer_data_with_timeout(
&mut inbound_reader,
&mut outbound_writer,
&mut inbound_buffer,
&mut tx_bytes,
&mut outbound_stream_eos,
)
.await
};

Ok(ProxyTraffic { rx_bytes, tx_bytes })
match result {
Err(ProxyError::Timeout) | Ok(0) => {
if outbound_stream_eos || inbound_stream_eos {
break;
}
}
Err(_) => break,
Ok(_) => {}
}
Err(e) => Err(ProxyError::Disconnected(anyhow!(e))),
};
}

debug!(
"data [{:<8}] = ↑ {:<10} ↓ {:<10} {} ↔ {}",
loop_count,
tx_bytes,
rx_bytes,
inbound_stream.peer_addr().unwrap(),
outbound_stream.local_addr().unwrap(),
);

stats_sender
.send(ServerStats::Traffic(ProxyTraffic { tx_bytes, rx_bytes }))
.await
.ok();

stats_sender.send(ServerStats::CloseConnection).await.ok();
result
Ok(ProxyTraffic { rx_bytes, tx_bytes })
}

async fn transfer_data_with_timeout<R, W>(
reader: &mut R,
writer: &mut W,
buffer: &mut [u8],
out_bytes: &mut u64,
eos_flag: &mut bool,
) -> Result<usize, ProxyError>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
match tokio::time::timeout(Duration::from_secs(15), reader.read(buffer))
.await
.map_err(|_: Elapsed| ProxyError::Timeout)?
{
Ok(0) => {
*eos_flag = true;
Ok(0)
}
Ok(n) => {
*out_bytes += n as u64;
writer
.write_all(&buffer[..n])
.await
.map_err(|_| ProxyError::InternalError)?;
Ok(n)
}
Err(_) => Err(ProxyError::InternalError), // Connection mostly reset by peer
}
}

fn collect_and_report_server_stats(&self, mut stats_receiver: Receiver<ServerStats>) {
Expand Down

0 comments on commit 131d32d

Please sign in to comment.