From 8d20b10e6376428d290e116565f0714c6c2d51ea Mon Sep 17 00:00:00 2001 From: Dmitry Zolotukhin Date: Sat, 13 Jul 2024 22:35:37 +0200 Subject: [PATCH] Cleaned up some code. Do not send response to client until connection is live. --- Cargo.toml | 2 +- src/network.rs | 52 ++++++++++++++++++++++++++++++++++++++++++-------- src/socks.rs | 50 ------------------------------------------------ 3 files changed, 45 insertions(+), 59 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 12d43fc..a69997b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ rust-version = "1.77" log = { version = "0.4", default-features = false } tokio = { version = "1.38", default-features = false, features = ["rt", "io-util", "signal", "net", "time", "sync"] } tokio-native-tls = { version = "0.3", default-features = false } -smoltcp = { version = "0.11", default-features = false, features = ["std", "async", "medium-ip", "proto-ipv4", "proto-ipv6", "socket-tcp"] } +smoltcp = { version = "0.11", default-features = false, features = ["std", "medium-ip", "proto-ipv4", "proto-ipv6", "socket-tcp"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } [profile.release] diff --git a/src/network.rs b/src/network.rs index 6fbb47b..7303893 100644 --- a/src/network.rs +++ b/src/network.rs @@ -17,6 +17,8 @@ pub struct Network<'a> { iface: iface::Interface, sockets: iface::SocketSet<'a>, bridges: HashMap, + opening_connections: + HashMap>>, cmd_sender: mpsc::Sender, cmd_receiver: mpsc::Receiver, } @@ -54,6 +56,7 @@ impl Network<'_> { iface, sockets, bridges: HashMap::new(), + opening_connections: HashMap::new(), cmd_sender, cmd_receiver, }) @@ -120,6 +123,42 @@ impl Network<'_> { } } } + + for (handle, response) in self.opening_connections.iter_mut() { + let socket = self.sockets.get::(*handle); + let result = match socket.state() { + tcp::State::Closed | tcp::State::TimeWait | tcp::State::Closing => { + Some(Err("Socket is closed".into())) + } + tcp::State::SynSent | tcp::State::SynReceived => { + // Not ready. + None + } + _ => { + let response = match socket.local_endpoint() { + Some(endpoint) => { + (*handle, SocketAddr::from((endpoint.addr, endpoint.port))) + } + None => (*handle, SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)), + }; + Some(Ok(response)) + } + }; + let result = if let Some(result) = result { + result + } else { + println!("Socket state is {}", socket.state()); + continue; + }; + let response = if let Some(response) = response.take() { + response + } else { + continue; + }; + let _ = response.send(result); + } + self.opening_connections + .retain(|_, response| response.is_some()); } pub fn create_command_sender(&self) -> mpsc::Sender { @@ -143,14 +182,9 @@ impl Network<'_> { return; } - let local_addr = match socket.local_endpoint() { - Some(endpoint) => SocketAddr::from((endpoint.addr, endpoint.port)), - None => SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), local_port), - }; - // TODO: use poll_fn until connection becomes fully available. - // See https://github.com/embassy-rs/embassy/blob/main/embassy-net/src/tcp.rs#L233 for more details. let socket_handle = self.sockets.add(socket); - let _ = response.send(Ok((socket_handle, local_addr))); + self.opening_connections + .insert(socket_handle, Some(response)); } Command::Bridge(socket_handle, reader, writer) => { let socket_tunnel = SocketTunnel { @@ -188,10 +222,12 @@ struct SocketTunnel { impl SocketTunnel {} +type SocketConnectionResult = Result<(iface::SocketHandle, SocketAddr), NetworkError>; + pub enum Command { Connect( std::net::SocketAddr, - oneshot::Sender>, + oneshot::Sender, ), Bridge( iface::SocketHandle, diff --git a/src/socks.rs b/src/socks.rs index 5ce3854..48fcee1 100644 --- a/src/socks.rs +++ b/src/socks.rs @@ -100,57 +100,7 @@ impl SocksConnection { Ok(_) => Ok(()), Err(_) => Err("Command channel closed".into()), } - //tokio::io::copy(&mut self.reader, &mut socket).await?; - //let (sender, receiver) = oneshot::channel(); - //let write_command = network::Command::Write(socket.handle(), self.reader, todo!()); - /* - let rt = runtime::Handle::current(); - rt.spawn(SocksConnection::tunnel_to_destination( - self.command_bridge.clone(), - reader, - socket.handle(), - )); - - let mut request = String::new(); - while self.reader.read_line(&mut request).await? > 0 { - println!("Received request {}", request); - request.clear(); - } - */ - } - - /* - async fn tunnel_to_destination( - command_bridge: mpsc::Sender, - mut reader: BufReader, - socket_handle: smoltcp::iface::SocketHandle, - ) -> Result<(), SocksError> { - let mut buffer = None; - const BUFFER_SIZE: usize = 1024; - loop { - let (sender, receiver) = oneshot::channel(); - let mut buf = if let Some(buf) = buffer.take() { - buf - } else { - Vec::with_capacity(BUFFER_SIZE) - }; - let current_size = buf.len(); - buf.resize(buf.capacity(), 0u8); - - let bytes_read = reader.read(&mut buf[current_size..]).await?; - buf.truncate(current_size + bytes_read); - let write_command = network::Command::Write(socket_handle, buf, sender); - let _ = command_bridge.send(write_command).await; - match receiver.await { - Ok(Ok(buf)) => buffer = Some(buf), - Ok(Err(_err)) => { - return Err("Error writing into socket".into()); - } - Err(_) => return Ok(()), - } - } } - */ async fn perform_handshake( &mut self,