diff --git a/Cargo.lock b/Cargo.lock index 071badf..07ea059 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1425,8 +1425,8 @@ dependencies = [ [[package]] name = "rstun" -version = "0.4.9" -source = "git+https://github.com/neevek/rstun#f8b777e4b46b5949f894518f582aefebf8e75d6c" +version = "0.4.12" +source = "git+https://github.com/neevek/rstun#9a4e8253813206ab4859dce0ba8047779321f5d9" dependencies = [ "android_logger", "anyhow", diff --git a/src/server.rs b/src/server.rs index b7e8657..6374453 100644 --- a/src/server.rs +++ b/src/server.rs @@ -473,8 +473,8 @@ impl Server { let nread = tokio::time::timeout(Duration::from_secs(2), inbound_stream.read(&mut buffer)) .await - .map_err(|_| ProxyError::BadRequest)? - .map_err(|_| ProxyError::Timeout)?; + .map_err(|_: Elapsed| ProxyError::Timeout)? + .map_err(|_| ProxyError::BadRequest)?; if proxy_handler.is_none() { proxy_handler = Some(Self::create_proxy_handler( @@ -535,7 +535,7 @@ impl Server { }; debug!( - "forward payload to next proxy({outbound_type:?}), {addr} -> {}", + "forward to {outbound_type:?}, {addr} → {}", upstream.unwrap() ); @@ -629,8 +629,8 @@ impl Server { None => { warn!( - "failed to create outbound stream for: {addr} from {}", - inbound_stream.peer_addr().unwrap() + "failed to create outbound stream for: {addr} from {:?}", + inbound_stream.peer_addr() ); proxy_handler .handle_outbound_failure(&mut inbound_stream) @@ -650,13 +650,13 @@ impl Server { ) -> Result, ProxyError> { match dashboard_addr { Some(addr) => { - debug!("dashboard request: {}", inbound_stream.peer_addr().unwrap()); + debug!("dashboard request: {:?}", inbound_stream.peer_addr()); Ok(Self::create_tcp_stream(addr, tcp_nodelay).await) } None => { log::warn!( - "request routing to the proxy server itself is rejected: {}", - inbound_stream.peer_addr().unwrap() + "request routing to the proxy server itself is rejected: {:?}", + inbound_stream.peer_addr() ); Err(ProxyError::BadRequest) } @@ -700,9 +700,18 @@ impl Server { ) -> Result { stats_sender.send(ServerStats::NewConnection).await.ok(); - const BUFFER_SIZE: usize = 4096; - let mut inbound_buffer = [0u8; BUFFER_SIZE]; - let mut outbound_buffer = [0u8; BUFFER_SIZE]; + let in_addr = inbound_stream + .peer_addr() + .map_err(|_| ProxyError::InternalError)?; + let out_addr = outbound_stream + .peer_addr() + .map_err(|_| ProxyError::InternalError)?; + + debug!("sess start: {in_addr:<20} ↔ {out_addr:<20}"); + + const BUFFER_SIZE: usize = 8192; + let mut inbound_buffer = vec![0; BUFFER_SIZE]; + let mut outbound_buffer = vec![0; BUFFER_SIZE]; let (mut inbound_reader, mut inbound_writer) = inbound_stream.split(); let (mut outbound_reader, mut outbound_writer) = outbound_stream.split(); @@ -735,7 +744,7 @@ impl Server { &mut inbound_writer, &mut outbound_buffer, &mut rx_bytes, - &mut inbound_stream_eos, + &mut outbound_stream_eos, ) .await } else { @@ -744,30 +753,26 @@ impl Server { &mut outbound_writer, &mut inbound_buffer, &mut tx_bytes, - &mut outbound_stream_eos, + &mut inbound_stream_eos, ) .await }; match result { - Err(ProxyError::Timeout) | Ok(0) => { - if outbound_stream_eos || inbound_stream_eos { + Ok(0) => { + if inbound_stream_eos && outbound_stream_eos { break; } } + Err(ProxyError::Timeout) => { + debug!("timeout : {in_addr:<20} ↔ {out_addr:<20} | ⟳ {loop_count:<8}| ↑ {tx_bytes:<10} ↓ {rx_bytes:<10}"); + break; + } Err(_) => break, Ok(_) => {} } } - - debug!( - "data [{:<8}] = ↑ {:<10} ↓ {:<10} {} ↔ {}", - loop_count, - tx_bytes, - rx_bytes, - inbound_stream.peer_addr().unwrap(), - outbound_stream.local_addr().unwrap(), - ); + debug!("sess end : {in_addr:<20} ↔ {out_addr:<20} | ⟳ {loop_count:<8}| ↑ {tx_bytes:<10} ↓ {rx_bytes:<10}"); stats_sender .send(ServerStats::Traffic(ProxyTraffic { tx_bytes, rx_bytes })) @@ -794,7 +799,13 @@ impl Server { .map_err(|_: Elapsed| ProxyError::Timeout)? { Ok(0) => { - *eos_flag = true; + if !*eos_flag { + *eos_flag = true; + writer + .shutdown() + .await + .map_err(|_| ProxyError::InternalError)?; + } Ok(0) } Ok(n) => {