From 35b0e49fd6347041543376c68780026d0e623867 Mon Sep 17 00:00:00 2001 From: zonyitoo Date: Sun, 26 Nov 2023 23:22:05 +0800 Subject: [PATCH] refactored http implementation and migrate to hyper 1.0 --- Cargo.lock | 32 +- crates/shadowsocks-service/Cargo.toml | 6 +- .../src/local/http/client_cache.rs | 45 -- .../src/local/http/connector.rs | 87 ---- .../src/local/http/dispatcher.rs | 406 ------------------ .../src/local/http/http_client.rs | 269 +++++++++++- .../src/local/http/http_service.rs | 339 +++++++++++++++ .../src/local/http/http_stream.rs | 12 - .../src/local/http/http_tls/mod.rs | 17 - .../src/local/http/http_tls/native_tls.rs | 91 ---- .../src/local/http/http_tls/rustls.rs | 82 ---- .../shadowsocks-service/src/local/http/mod.rs | 14 +- .../src/local/http/server.rs | 168 ++++---- .../src/local/http/tokio_rt.rs | 151 +++++++ .../src/local/http/utils.rs | 96 ++++- 15 files changed, 949 insertions(+), 866 deletions(-) delete mode 100644 crates/shadowsocks-service/src/local/http/client_cache.rs delete mode 100644 crates/shadowsocks-service/src/local/http/connector.rs delete mode 100644 crates/shadowsocks-service/src/local/http/dispatcher.rs create mode 100644 crates/shadowsocks-service/src/local/http/http_service.rs delete mode 100644 crates/shadowsocks-service/src/local/http/http_tls/mod.rs delete mode 100644 crates/shadowsocks-service/src/local/http/http_tls/native_tls.rs delete mode 100644 crates/shadowsocks-service/src/local/http/http_tls/rustls.rs create mode 100644 crates/shadowsocks-service/src/local/http/tokio_rt.rs diff --git a/Cargo.lock b/Cargo.lock index b46d5056aac1..5201418fcf40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1198,6 +1198,19 @@ dependencies = [ "http 1.0.0", ] +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" @@ -2685,6 +2698,7 @@ dependencies = [ "etherparse", "futures", "hickory-resolver", + "http-body-util", "hyper 1.0.1", "idna", "ipnet", @@ -2709,7 +2723,6 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", - "tower", "tun", "webpki-roots", "windows-sys 0.52.0", @@ -3082,23 +3095,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tower-layer" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" - [[package]] name = "tower-service" version = "0.3.2" diff --git a/crates/shadowsocks-service/Cargo.toml b/crates/shadowsocks-service/Cargo.toml index 0c8631f42990..743b8cfec5ec 100644 --- a/crates/shadowsocks-service/Cargo.toml +++ b/crates/shadowsocks-service/Cargo.toml @@ -67,7 +67,7 @@ local-dns-relay = ["local-dns"] # Currently is only used in Android local-flow-stat = ["local"] # Enable HTTP protocol for sslocal -local-http = ["local", "hyper", "tower"] +local-http = ["local", "hyper", "http-body-util"] local-http-native-tls = ["local-http", "tokio-native-tls", "native-tls"] local-http-native-tls-vendored = [ "local-http-native-tls", @@ -144,8 +144,8 @@ async-trait = "0.1" socket2 = { version = "0.5", features = ["all"] } libc = "0.2.141" -hyper = { version = "1.0.1", optional = true, features = ["full"] } -tower = { version = "0.4", optional = true } +hyper = { version = "1.0", optional = true, features = ["full"] } +http-body-util = { version = "0.1", optional = true } hickory-resolver = { version = "0.24", optional = true, features = [ "serde-config", diff --git a/crates/shadowsocks-service/src/local/http/client_cache.rs b/crates/shadowsocks-service/src/local/http/client_cache.rs deleted file mode 100644 index 8d90f830ed32..000000000000 --- a/crates/shadowsocks-service/src/local/http/client_cache.rs +++ /dev/null @@ -1,45 +0,0 @@ -//! Cached HTTP client for remote server - -use std::sync::Arc; - -use hyper::{Body, Client}; -use lru_time_cache::LruCache; -use shadowsocks::config::ServerAddr; -use tokio::sync::Mutex; - -use crate::local::{context::ServiceContext, loadbalancing::ServerIdent}; - -use super::{connector::Connector, http_client::ProxyHttpClient}; - -/// Cached HTTP client for remote servers -pub struct ProxyClientCache { - context: Arc, - cache: Mutex>, -} - -impl ProxyClientCache { - pub fn new(context: Arc) -> ProxyClientCache { - ProxyClientCache { - context, - cache: Mutex::new(LruCache::with_capacity(5)), - } - } - - pub async fn get_connected(&self, server: &Arc) -> ProxyHttpClient { - let server_config = server.server_config(); - - let mut cache = self.cache.lock().await; - if let Some(client) = cache.get(server_config.addr()) { - return client.clone(); - } - - // Create a new client - let client = Client::builder() - .http1_preserve_header_case(true) - .http1_title_case_headers(true) - .build::<_, Body>(Connector::new(self.context.clone(), Some(server.clone()))); - cache.insert(server_config.addr().clone(), client.clone()); - - client - } -} diff --git a/crates/shadowsocks-service/src/local/http/connector.rs b/crates/shadowsocks-service/src/local/http/connector.rs deleted file mode 100644 index 414595e1383e..000000000000 --- a/crates/shadowsocks-service/src/local/http/connector.rs +++ /dev/null @@ -1,87 +0,0 @@ -//! HTTP Client connector - -use std::{ - future::Future, - io::{self, ErrorKind}, - pin::Pin, - sync::Arc, - task::{self, Poll}, -}; - -use futures::{future::BoxFuture, FutureExt}; -use hyper::Uri; -use log::error; -use pin_project::pin_project; -use tower::Service; - -use crate::local::{context::ServiceContext, loadbalancing::ServerIdent, net::AutoProxyClientStream}; - -use super::{http_stream::ProxyHttpStream, utils::host_addr}; - -#[derive(Clone)] -pub struct Connector { - context: Arc, - server: Option>, -} - -impl Connector { - pub fn new(context: Arc, server: Option>) -> Connector { - Connector { context, server } - } -} - -impl Service for Connector { - type Error = io::Error; - type Future = Connecting; - type Response = ProxyHttpStream; - - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, dst: Uri) -> Self::Future { - let context = self.context.clone(); - let server = self.server.clone(); - Connecting { - fut: async move { - let is_https = dst.scheme_str() == Some("https"); - match host_addr(&dst) { - None => { - use std::io::Error; - error!("HTTP target URI must be a valid address, but found: {}", dst); - let err = Error::new(ErrorKind::Other, "URI must be a valid Address"); - Err(err) - } - Some(addr) => { - let s = match server { - Some(ser) => AutoProxyClientStream::connect_proxied(context, ser.as_ref(), addr).await?, - None => AutoProxyClientStream::connect_bypassed(context, addr).await?, - }; - - if is_https { - let host = dst.host().unwrap().trim_start_matches('[').trim_start_matches(']'); - ProxyHttpStream::connect_https(s, host).await - } else { - Ok(ProxyHttpStream::connect_http(s)) - } - } - } - } - .boxed(), - } - } -} - -#[pin_project] -pub struct Connecting { - #[pin] - fut: BoxFuture<'static, io::Result>, -} - -impl Future for Connecting { - type Output = io::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - self.project().fut.poll(cx) - } -} diff --git a/crates/shadowsocks-service/src/local/http/dispatcher.rs b/crates/shadowsocks-service/src/local/http/dispatcher.rs deleted file mode 100644 index 11b461fff888..000000000000 --- a/crates/shadowsocks-service/src/local/http/dispatcher.rs +++ /dev/null @@ -1,406 +0,0 @@ -//! HTTP Service Dispatcher - -use std::{io, net::SocketAddr, str::FromStr, sync::Arc}; - -use hyper::{ - header::{GetAll, HeaderValue}, - http::uri::{Authority, Scheme}, - upgrade, - Body, - HeaderMap, - Method, - Request, - Response, - StatusCode, - Uri, - Version, -}; -use log::{debug, error, trace}; - -use shadowsocks::relay::socks5::Address; - -use crate::local::{ - context::ServiceContext, - loadbalancing::PingBalancer, - net::{AutoProxyClientStream, AutoProxyIo}, - utils::{establish_tcp_tunnel, establish_tcp_tunnel_bypassed}, -}; - -use super::{ - client_cache::ProxyClientCache, - http_client::{BypassHttpClient, HttpClientEnum}, - utils::{authority_addr, host_addr}, -}; - -pub struct HttpDispatcher { - context: Arc, - req: Request, - balancer: PingBalancer, - client_addr: SocketAddr, - bypass_client: BypassHttpClient, - proxy_client_cache: Arc, -} - -impl HttpDispatcher { - pub fn new( - context: Arc, - req: Request, - balancer: PingBalancer, - client_addr: SocketAddr, - bypass_client: BypassHttpClient, - proxy_client_cache: Arc, - ) -> HttpDispatcher { - HttpDispatcher { - context, - req, - balancer, - client_addr, - bypass_client, - proxy_client_cache, - } - } - - pub async fn dispatch(mut self) -> io::Result> { - trace!("request {} {:?}", self.client_addr, self.req); - - // Parse URI - // - // Proxy request URI must contains a host - let host = match host_addr(self.req.uri()) { - None => { - if self.req.uri().authority().is_some() { - // URI has authority but invalid - error!( - "HTTP {} URI {} doesn't have a valid host", - self.req.method(), - self.req.uri() - ); - return make_bad_request(); - } else { - trace!( - "HTTP {} URI {} doesn't have a valid host", - self.req.method(), - self.req.uri() - ); - } - - match get_addr_from_header(&mut self.req) { - Ok(h) => h, - Err(()) => return make_bad_request(), - } - } - Some(h) => h, - }; - - if Method::CONNECT == self.req.method() { - // Establish a TCP tunnel - // https://tools.ietf.org/html/draft-luotonen-web-proxy-tunneling-01 - - debug!("HTTP CONNECT {}", host); - - // Connect to Shadowsocks' remote - // - // FIXME: What STATUS should I return for connection error? - let mut server_opt = None; - let mut stream = if self.balancer.is_empty() { - AutoProxyClientStream::connect_bypassed(self.context, &host).await? - } else { - let server = self.balancer.best_tcp_server(); - - let stream = AutoProxyClientStream::connect(self.context, server.as_ref(), &host).await?; - server_opt = Some(server); - - stream - }; - - debug!( - "CONNECT relay connected {} <-> {} ({})", - self.client_addr, - host, - if stream.is_bypassed() { "bypassed" } else { "proxied" } - ); - - // Upgrade to a TCP tunnel - // - // Note: only after client received an empty body with STATUS_OK can the - // connection be upgraded, so we can't return a response inside - // `on_upgrade` future. - let req = self.req; - let client_addr = self.client_addr; - tokio::spawn(async move { - match upgrade::on(req).await { - Ok(mut upgraded) => { - trace!("CONNECT tunnel upgrade success, {} <-> {}", client_addr, host); - - let _ = match server_opt { - Some(server) => { - establish_tcp_tunnel( - server.server_config(), - &mut upgraded, - &mut stream, - client_addr, - &host, - ) - .await - } - None => establish_tcp_tunnel_bypassed(&mut upgraded, &mut stream, client_addr, &host).await, - }; - } - Err(e) => { - error!( - "failed to upgrade TCP tunnel {} <-> {}, error: {}", - client_addr, host, e - ); - } - } - }); - - // Connection established - let resp = Response::builder().body(Body::empty()).unwrap(); - - Ok(resp) - } else { - let method = self.req.method().clone(); - let version = self.req.version(); - debug!("HTTP {} {} {:?}", method, host, version); - - // Check if client wants us to keep long connection - let conn_keep_alive = check_keep_alive(version, self.req.headers(), true); - - // Remove non-forwardable headers - clear_hop_headers(self.req.headers_mut()); - - // Set keep-alive for connection with remote - set_conn_keep_alive(version, self.req.headers_mut(), conn_keep_alive); - let client = if self.balancer.is_empty() || self.context.check_target_bypassed(&host).await { - trace!("bypassed {} -> {} {:?}", self.client_addr, host, self.req); - HttpClientEnum::Bypass(self.bypass_client) - } else { - trace!("proxied {} -> {} {:?}", self.client_addr, host, self.req); - - // Keep connections for clients in ServerScore::client - // client instance is kept for Keep-Alive connections - let server = self.balancer.best_tcp_server(); - HttpClientEnum::Proxy(self.proxy_client_cache.get_connected(&server).await) - }; - - let mut res = match client.send(self.req).await { - Ok(res) => res, - Err(err) => { - error!( - "HTTP {} {} <-> {} relay failed, error: {}", - method, self.client_addr, host, err - ); - - let mut resp = Response::new(Body::from(format!("relay failed to {host}"))); - *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return Ok(resp); - } - }; - - trace!("received {} <- {} {:?}", self.client_addr, host, res); - - let res_keep_alive = conn_keep_alive && check_keep_alive(res.version(), res.headers(), false); - - // Clear unforwardable headers - clear_hop_headers(res.headers_mut()); - - if res.version() != version { - // Reset version to matches req's version - trace!("response version {:?} => {:?}", res.version(), version); - *res.version_mut() = version; - } - - // Set Connection header - set_conn_keep_alive(res.version(), res.headers_mut(), res_keep_alive); - - trace!("response {} <- {} {:?}", self.client_addr, host, res); - - debug!("HTTP {} relay {} <-> {} finished", method, self.client_addr, host); - - Ok(res) - } - } -} - -fn make_bad_request() -> io::Result> { - let mut resp = Response::new(Body::empty()); - *resp.status_mut() = StatusCode::BAD_REQUEST; - Ok(resp) -} - -fn get_keep_alive_val(values: GetAll) -> Option { - let mut conn_keep_alive = None; - for value in values { - if let Ok(value) = value.to_str() { - if value.eq_ignore_ascii_case("close") { - conn_keep_alive = Some(false); - } else { - for part in value.split(',') { - let part = part.trim(); - if part.eq_ignore_ascii_case("keep-alive") { - conn_keep_alive = Some(true); - break; - } - } - } - } - } - conn_keep_alive -} - -fn check_keep_alive(version: Version, headers: &HeaderMap, check_proxy: bool) -> bool { - // HTTP/1.1, HTTP/2, HTTP/3 keeps alive by default - let mut conn_keep_alive = !matches!(version, Version::HTTP_09 | Version::HTTP_10); - - if check_proxy { - // Modern browsers will send Proxy-Connection instead of Connection - // for HTTP/1.0 proxies which blindly forward Connection to remote - // - // https://tools.ietf.org/html/rfc7230#appendix-A.1.2 - if let Some(b) = get_keep_alive_val(headers.get_all("Proxy-Connection")) { - conn_keep_alive = b - } - } - - // Connection will replace Proxy-Connection - // - // But why client sent both Connection and Proxy-Connection? That's not standard! - if let Some(b) = get_keep_alive_val(headers.get_all("Connection")) { - conn_keep_alive = b - } - - conn_keep_alive -} - -fn get_extra_headers(headers: GetAll) -> Vec { - let mut extra_headers = Vec::new(); - for connection in headers { - if let Ok(conn) = connection.to_str() { - // close is a command instead of a header - if conn.eq_ignore_ascii_case("close") { - continue; - } - for header in conn.split(',') { - let header = header.trim(); - extra_headers.push(header.to_owned()); - } - } - } - extra_headers -} - -fn clear_hop_headers(headers: &mut HeaderMap) { - // Clear headers indicated by Connection and Proxy-Connection - let mut extra_headers = get_extra_headers(headers.get_all("Connection")); - extra_headers.extend(get_extra_headers(headers.get_all("Proxy-Connection"))); - - for header in extra_headers { - while headers.remove(&header).is_some() {} - } - - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection - const HOP_BY_HOP_HEADERS: [&str; 9] = [ - "Keep-Alive", - "Transfer-Encoding", - "TE", - "Connection", - "Trailer", - "Upgrade", - "Proxy-Authorization", - "Proxy-Authenticate", - "Proxy-Connection", // Not standard, but many implementations do send this header - ]; - - for header in &HOP_BY_HOP_HEADERS { - while headers.remove(*header).is_some() {} - } -} - -fn set_conn_keep_alive(version: Version, headers: &mut HeaderMap, keep_alive: bool) { - match version { - Version::HTTP_09 | Version::HTTP_10 => { - // HTTP/1.0 close connection by default - if keep_alive { - headers.insert("Connection", HeaderValue::from_static("keep-alive")); - } - } - _ => { - // HTTP/1.1, HTTP/2, HTTP/3 keep-alive connection by default - if !keep_alive { - headers.insert("Connection", HeaderValue::from_static("close")); - } - } - } -} - -fn get_addr_from_header(req: &mut Request) -> Result { - // Try to be compatible as a transparent HTTP proxy - match req.headers().get("Host") { - Some(hhost) => match hhost.to_str() { - Ok(shost) => { - match Authority::from_str(shost) { - Ok(authority) => match authority_addr(req.uri().scheme_str(), &authority) { - Some(host) => { - trace!("HTTP {} URI {} got host from header: {}", req.method(), req.uri(), host); - - // Reassemble URI - let mut parts = req.uri().clone().into_parts(); - if parts.scheme.is_none() { - // Use http as default. - parts.scheme = Some(Scheme::HTTP); - } - parts.authority = Some(authority); - - // Replaces URI - *req.uri_mut() = Uri::from_parts(parts).expect("Reassemble URI failed"); - - debug!("reassembled URI from \"Host\", {}", req.uri()); - - Ok(host) - } - None => { - error!( - "HTTP {} URI {} \"Host\" header invalid, value: {}", - req.method(), - req.uri(), - shost - ); - - Err(()) - } - }, - Err(..) => { - error!( - "HTTP {} URI {} \"Host\" header is not an Authority, value: {:?}", - req.method(), - req.uri(), - hhost - ); - - Err(()) - } - } - } - Err(..) => { - error!( - "HTTP {} URI {} \"Host\" header invalid encoding, value: {:?}", - req.method(), - req.uri(), - hhost - ); - - Err(()) - } - }, - None => { - error!( - "HTTP {} URI doesn't have valid host and missing the \"Host\" header, URI: {}", - req.method(), - req.uri() - ); - - Err(()) - } - } -} diff --git a/crates/shadowsocks-service/src/local/http/http_client.rs b/crates/shadowsocks-service/src/local/http/http_client.rs index ed56e6ea7693..b2c13151e3b5 100644 --- a/crates/shadowsocks-service/src/local/http/http_client.rs +++ b/crates/shadowsocks-service/src/local/http/http_client.rs @@ -1,22 +1,269 @@ //! HTTP Client -use hyper::{client::ResponseFuture, Body, Client, Request}; +use std::{ + collections::VecDeque, + io::{self, ErrorKind}, + sync::Arc, + time::{Duration, Instant}, +}; -use super::connector::Connector; +use hyper::{ + body, + client::conn::{http1, http2}, + http::uri::Scheme, + Request, + Response, +}; +use log::{error, trace}; +use lru_time_cache::LruCache; +use shadowsocks::relay::Address; +use tokio::sync::Mutex; -pub type ProxyHttpClient = Client; -pub type BypassHttpClient = Client; +use crate::local::{context::ServiceContext, loadbalancing::PingBalancer, net::AutoProxyClientStream}; -pub enum HttpClientEnum { - Proxy(ProxyHttpClient), - Bypass(BypassHttpClient), +use super::{ + http_stream::ProxyHttpStream, + tokio_rt::{TokioExecutor, TokioIo}, + utils::{check_keep_alive, connect_host, host_addr}, +}; + +const CONNECTION_EXPIRE_DURATION: Duration = Duration::from_secs(20); + +#[derive(thiserror::Error, Debug)] +pub enum HttpClientError { + #[error("{0}")] + Hyper(#[from] hyper::Error), + #[error("{0}")] + Io(#[from] io::Error), +} + +#[derive(Clone)] +pub struct HttpClient { + cache_conn: Arc>>>, +} + +impl HttpClient { + pub fn new() -> HttpClient { + HttpClient { + cache_conn: Arc::new(Mutex::new(LruCache::with_expiry_duration(CONNECTION_EXPIRE_DURATION))), + } + } + + #[inline] + pub async fn send_request( + &self, + context: Arc, + req: Request, + balancer: &PingBalancer, + ) -> Result, HttpClientError> { + let host = match host_addr(req.uri()) { + Some(h) => h, + None => panic!("URI missing host: {}", req.uri()), + }; + + // 1. Check if there is an available client + // + // FIXME: If the cached connection is closed unexpectly, this request will fail immediately. + if let Some(c) = self.get_cached_connection(&host).await { + trace!("HTTP client for host: {} taken from cache", host); + match self.send_request_conn(host, c, req).await { + Ok(o) => return Ok(o), + Err(err) => return Err(err.into()), + } + } + + // 2. If no. Make a new connection + let scheme = match req.uri().scheme() { + Some(s) => s, + None => &Scheme::HTTP, + }; + + let domain = req + .uri() + .host() + .unwrap() + .trim_start_matches('[') + .trim_start_matches(']'); + let c = match HttpConnection::connect(context.clone(), scheme, host.clone(), domain, balancer).await { + Ok(c) => c, + Err(err) => { + error!("failed to connect to host: {}, error: {}", host, err); + return Err(err.into()); + } + }; + + self.send_request_conn(host, c, req).await.map_err(Into::into) + } + + async fn get_cached_connection(&self, host: &Address) -> Option { + if let Some(q) = self.cache_conn.lock().await.get_mut(&host) { + while let Some((c, inst)) = q.pop_front() { + let now = Instant::now(); + if now - inst >= CONNECTION_EXPIRE_DURATION { + continue; + } + if c.is_closed() { + continue; + } + return Some(c); + } + } + None + } + + async fn send_request_conn( + &self, + host: Address, + mut c: HttpConnection, + req: Request, + ) -> hyper::Result> { + trace!("HTTP making request to host: {}, request: {:?}", host, req); + let response = c.send_request(req).await?; + trace!("HTTP received response from host: {}, response: {:?}", host, response); + + // Check keep-alive + if check_keep_alive(response.version(), response.headers(), false) { + trace!( + "HTTP connection keep-alive for host: {}, response: {:?}", + host, + response + ); + self.cache_conn + .lock() + .await + .entry(host) + .or_insert_with(VecDeque::new) + .push_back((c, Instant::now())); + } + + Ok(response) + } +} + +enum HttpConnection { + Http1(http1::SendRequest), + Http2(http2::SendRequest), } -impl HttpClientEnum { - pub fn send(&self, req: Request) -> ResponseFuture { +impl HttpConnection { + async fn connect( + context: Arc, + scheme: &Scheme, + host: Address, + domain: &str, + balancer: &PingBalancer, + ) -> io::Result { + if *scheme != Scheme::HTTP && *scheme != Scheme::HTTPS { + return Err(io::Error::new(ErrorKind::InvalidInput, "invalid scheme")); + } + + let (stream, _) = connect_host(context, &host, balancer).await?; + + if *scheme == Scheme::HTTP { + HttpConnection::connect_http_http1(scheme, host, stream).await + } else if *scheme == Scheme::HTTPS { + HttpConnection::connect_https(scheme, host, domain, stream).await + } else { + unreachable!() + } + } + + async fn connect_http_http1( + scheme: &Scheme, + host: Address, + stream: AutoProxyClientStream, + ) -> io::Result { + trace!( + "HTTP making new HTTP/1.1 connection to host: {}, scheme: {}", + host, + scheme + ); + + let stream = ProxyHttpStream::connect_http(stream); + + // HTTP/1.x + let (send_request, connection) = match http1::Builder::new() + .preserve_header_case(true) + .title_case_headers(true) + .handshake(TokioIo::new(stream)) + .await + { + Ok(s) => s, + Err(err) => return Err(io::Error::new(ErrorKind::Other, err)), + }; + + tokio::spawn(async move { + if let Err(err) = connection.await { + error!("HTTP/1.x connection to host: {} aborted with error: {}", host, err); + } + }); + + Ok(HttpConnection::Http1(send_request)) + } + + async fn connect_https( + scheme: &Scheme, + host: Address, + domain: &str, + stream: AutoProxyClientStream, + ) -> io::Result { + trace!("HTTP making new TLS connection to host: {}, scheme: {}", host, scheme); + + // TLS handshake, check alpn for h2 support. + let stream = ProxyHttpStream::connect_https(stream, domain).await?; + + if stream.negotiated_http2() { + // H2 connnection + let (send_request, connection) = match http2::Builder::new(TokioExecutor) + .keep_alive_interval(Duration::from_secs(15)) + .handshake(TokioIo::new(stream)) + .await + { + Ok(s) => s, + Err(err) => return Err(io::Error::new(ErrorKind::Other, err)), + }; + + tokio::spawn(async move { + if let Err(err) = connection.await { + error!("HTTP/2 TLS connection to host: {} aborted with error: {}", host, err); + } + }); + + Ok(HttpConnection::Http2(send_request)) + } else { + // HTTP/1.x TLS + let (send_request, connection) = match http1::Builder::new() + .preserve_header_case(true) + .title_case_headers(true) + .handshake(TokioIo::new(stream)) + .await + { + Ok(s) => s, + Err(err) => return Err(io::Error::new(ErrorKind::Other, err)), + }; + + tokio::spawn(async move { + if let Err(err) = connection.await { + error!("HTTP/1.x TLS connection to host: {} aborted with error: {}", host, err); + } + }); + + Ok(HttpConnection::Http1(send_request)) + } + } + + #[inline] + pub async fn send_request(&mut self, req: Request) -> hyper::Result> { + match self { + HttpConnection::Http1(r) => r.send_request(req).await, + HttpConnection::Http2(r) => r.send_request(req).await, + } + } + + pub fn is_closed(&self) -> bool { match self { - HttpClientEnum::Proxy(c) => c.request(req), - HttpClientEnum::Bypass(b) => b.request(req), + HttpConnection::Http1(r) => r.is_closed(), + HttpConnection::Http2(r) => r.is_closed(), } } } diff --git a/crates/shadowsocks-service/src/local/http/http_service.rs b/crates/shadowsocks-service/src/local/http/http_service.rs new file mode 100644 index 000000000000..70569b9887b9 --- /dev/null +++ b/crates/shadowsocks-service/src/local/http/http_service.rs @@ -0,0 +1,339 @@ +//! Shadowsocks HTTP Proxy server dispatcher + +use std::{net::SocketAddr, str::FromStr, sync::Arc}; + +use bytes::Bytes; +use http_body_util::{combinators::BoxBody, BodyExt}; +use hyper::{ + body, + header::{self, HeaderValue}, + http::uri::{Authority, Scheme}, + HeaderMap, + Method, + Request, + Response, + StatusCode, + Uri, + Version, +}; +use log::{debug, error, trace}; +use shadowsocks::relay::Address; + +use crate::local::{ + context::ServiceContext, + http::{http_client::HttpClientError, tokio_rt::TokioIo}, + loadbalancing::PingBalancer, + net::AutoProxyIo, + utils::{establish_tcp_tunnel, establish_tcp_tunnel_bypassed}, +}; + +use super::{ + http_client::HttpClient, + utils::{authority_addr, check_keep_alive, connect_host, host_addr}, +}; + +pub struct HttpService { + context: Arc, + peer_addr: SocketAddr, + http_client: HttpClient, + balancer: PingBalancer, +} + +impl HttpService { + pub fn new( + context: Arc, + peer_addr: SocketAddr, + http_client: HttpClient, + balancer: PingBalancer, + ) -> HttpService { + HttpService { + context, + peer_addr, + http_client, + balancer, + } + } + + pub async fn serve_connection( + self, + mut req: Request, + ) -> hyper::Result>> { + trace!("request {} {:?}", self.peer_addr, req); + + // Parse URI + // + // Proxy request URI must contains a host + let host = match host_addr(req.uri()) { + None => { + if req.uri().authority().is_some() { + // URI has authority but invalid + error!("HTTP {} URI {} doesn't have a valid host", req.method(), req.uri()); + return make_bad_request(); + } else { + trace!("HTTP {} URI {} doesn't have a valid host", req.method(), req.uri()); + } + + match get_addr_from_header(&mut req) { + Ok(h) => h, + Err(()) => return make_bad_request(), + } + } + Some(h) => h, + }; + + if req.method() == Method::CONNECT { + // Establish a TCP tunnel + // https://tools.ietf.org/html/draft-luotonen-web-proxy-tunneling-01 + + debug!("HTTP CONNECT {}", host); + + // Connect to Shadowsocks' remote + // + // FIXME: What STATUS should I return for connection error? + let (mut stream, server_opt) = match connect_host(self.context, &host, &self.balancer).await { + Ok(s) => s, + Err(err) => { + error!("failed to CONNECT host: {}, error: {}", host, err); + return make_internal_server_error(); + } + }; + + debug!( + "CONNECT relay connected {} <-> {} ({})", + self.peer_addr, + host, + if stream.is_bypassed() { "bypassed" } else { "proxied" } + ); + + let client_addr = self.peer_addr; + tokio::spawn(async move { + match hyper::upgrade::on(req).await { + Ok(upgraded) => { + trace!("CONNECT tunnel upgrade success, {} <-> {}", client_addr, host); + + let mut upgraded_io = TokioIo::new(upgraded); + + let _ = match server_opt { + Some(server) => { + establish_tcp_tunnel( + server.server_config(), + &mut upgraded_io, + &mut stream, + client_addr, + &host, + ) + .await + } + None => { + establish_tcp_tunnel_bypassed(&mut upgraded_io, &mut stream, client_addr, &host).await + } + }; + } + Err(err) => { + error!("failed to upgrade CONNECT request, error: {}", err); + } + } + }); + + return Ok(Response::new(empty_body())); + } + + // Traditional HTTP Proxy request + + let method = req.method().clone(); + let version = req.version(); + debug!("HTTP {} {} {:?}", method, host, version); + + // Check if client wants us to keep long connection + let conn_keep_alive = check_keep_alive(version, req.headers(), true); + + // Remove non-forwardable headers + clear_hop_headers(req.headers_mut()); + + // Set keep-alive for connection with remote + set_conn_keep_alive(version, req.headers_mut(), conn_keep_alive); + + let mut res = match self.http_client.send_request(self.context, req, &self.balancer).await { + Ok(resp) => resp, + Err(HttpClientError::Hyper(e)) => return Err(e), + Err(HttpClientError::Io(err)) => { + error!("failed to make request to host: {}, error: {}", host, err); + return make_internal_server_error(); + } + }; + + trace!("received {} <- {} {:?}", self.peer_addr, host, res); + + let res_keep_alive = conn_keep_alive && check_keep_alive(res.version(), res.headers(), false); + + // Clear unforwardable headers + clear_hop_headers(res.headers_mut()); + + if res.version() != version { + // Reset version to matches req's version + trace!("response version {:?} => {:?}", res.version(), version); + *res.version_mut() = version; + } + + // Set Connection header + set_conn_keep_alive(res.version(), res.headers_mut(), res_keep_alive); + + trace!("response {} <- {} {:?}", self.peer_addr, host, res); + + debug!("HTTP {} relay {} <-> {} finished", method, self.peer_addr, host); + + Ok(res.map(|b| b.boxed())) + } +} + +fn empty_body() -> BoxBody { + http_body_util::Empty::::new() + .map_err(|never| match never {}) + .boxed() +} + +fn make_bad_request() -> Result>, hyper::Error> { + Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(empty_body()) + .unwrap()) +} + +fn make_internal_server_error() -> Result>, hyper::Error> { + Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(empty_body()) + .unwrap()) +} + +fn get_extra_headers(headers: header::GetAll) -> Vec { + let mut extra_headers = Vec::new(); + for connection in headers { + if let Ok(conn) = connection.to_str() { + // close is a command instead of a header + if conn.eq_ignore_ascii_case("close") { + continue; + } + for header in conn.split(',') { + let header = header.trim(); + extra_headers.push(header.to_owned()); + } + } + } + extra_headers +} + +fn clear_hop_headers(headers: &mut HeaderMap) { + // Clear headers indicated by Connection and Proxy-Connection + let mut extra_headers = get_extra_headers(headers.get_all("Connection")); + extra_headers.extend(get_extra_headers(headers.get_all("Proxy-Connection"))); + + for header in extra_headers { + while headers.remove(&header).is_some() {} + } + + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection + const HOP_BY_HOP_HEADERS: [&str; 9] = [ + "Keep-Alive", + "Transfer-Encoding", + "TE", + "Connection", + "Trailer", + "Upgrade", + "Proxy-Authorization", + "Proxy-Authenticate", + "Proxy-Connection", // Not standard, but many implementations do send this header + ]; + + for header in &HOP_BY_HOP_HEADERS { + while headers.remove(*header).is_some() {} + } +} + +fn set_conn_keep_alive(version: Version, headers: &mut HeaderMap, keep_alive: bool) { + match version { + Version::HTTP_09 | Version::HTTP_10 => { + // HTTP/1.0 close connection by default + if keep_alive { + headers.insert("Connection", HeaderValue::from_static("keep-alive")); + } + } + _ => { + // HTTP/1.1, HTTP/2, HTTP/3 keep-alive connection by default + if !keep_alive { + headers.insert("Connection", HeaderValue::from_static("close")); + } + } + } +} + +fn get_addr_from_header(req: &mut Request) -> Result { + // Try to be compatible as a transparent HTTP proxy + match req.headers().get("Host") { + Some(hhost) => match hhost.to_str() { + Ok(shost) => { + match Authority::from_str(shost) { + Ok(authority) => match authority_addr(req.uri().scheme_str(), &authority) { + Some(host) => { + trace!("HTTP {} URI {} got host from header: {}", req.method(), req.uri(), host); + + // Reassemble URI + let mut parts = req.uri().clone().into_parts(); + if parts.scheme.is_none() { + // Use http as default. + parts.scheme = Some(Scheme::HTTP); + } + parts.authority = Some(authority); + + // Replaces URI + *req.uri_mut() = Uri::from_parts(parts).expect("Reassemble URI failed"); + + debug!("reassembled URI from \"Host\", {}", req.uri()); + + Ok(host) + } + None => { + error!( + "HTTP {} URI {} \"Host\" header invalid, value: {}", + req.method(), + req.uri(), + shost + ); + + Err(()) + } + }, + Err(..) => { + error!( + "HTTP {} URI {} \"Host\" header is not an Authority, value: {:?}", + req.method(), + req.uri(), + hhost + ); + + Err(()) + } + } + } + Err(..) => { + error!( + "HTTP {} URI {} \"Host\" header invalid encoding, value: {:?}", + req.method(), + req.uri(), + hhost + ); + + Err(()) + } + }, + None => { + error!( + "HTTP {} URI doesn't have valid host and missing the \"Host\" header, URI: {}", + req.method(), + req.uri() + ); + + Err(()) + } + } +} diff --git a/crates/shadowsocks-service/src/local/http/http_stream.rs b/crates/shadowsocks-service/src/local/http/http_stream.rs index 1d4d9e43ca81..398fd2fe6c19 100644 --- a/crates/shadowsocks-service/src/local/http/http_stream.rs +++ b/crates/shadowsocks-service/src/local/http/http_stream.rs @@ -6,7 +6,6 @@ use std::{ task::{self, Poll}, }; -use hyper::client::connect::{Connected, Connection}; use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -178,14 +177,3 @@ impl AsyncWrite for ProxyHttpStream { forward_call!(self, poll_shutdown, cx) } } - -impl Connection for ProxyHttpStream { - fn connected(&self) -> Connected { - let conn = Connected::new(); - if self.negotiated_http2() { - conn.negotiated_h2() - } else { - conn - } - } -} diff --git a/crates/shadowsocks-service/src/local/http/http_tls/mod.rs b/crates/shadowsocks-service/src/local/http/http_tls/mod.rs deleted file mode 100644 index 7f4bffbcddf8..000000000000 --- a/crates/shadowsocks-service/src/local/http/http_tls/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! TLS support for HTTP local (HTTPS) -//! -//! Choosing TLS library by `local-http-rustls` and `local-http-native-tls` - -#![allow(dead_code)] // For TlsAcceptor - -#[cfg(feature = "local-http-native-tls")] -pub mod native_tls; - -#[cfg(feature = "local-http-native-tls")] -pub use self::native_tls::TlsStream; - -#[cfg(feature = "local-http-rustls")] -pub mod rustls; - -#[cfg(feature = "local-http-rustls")] -pub use self::rustls::TlsStream; diff --git a/crates/shadowsocks-service/src/local/http/http_tls/native_tls.rs b/crates/shadowsocks-service/src/local/http/http_tls/native_tls.rs deleted file mode 100644 index 3e6474c58ce5..000000000000 --- a/crates/shadowsocks-service/src/local/http/http_tls/native_tls.rs +++ /dev/null @@ -1,91 +0,0 @@ -//! TLS support by [native-tls](https://crates.io/crates/native-tls) - -use std::{ - future::Future, - io, - net::SocketAddr, - pin::Pin, - task::{self, Poll}, -}; - -use futures::ready; -use hyper::server::conn::AddrStream; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - -enum TlsStreamState { - Handshaking( - Pin< - Box< - dyn Future, native_tls::Error>> - + Send - + 'static, - >, - >, - ), - Streaming(tokio_native_tls::TlsStream), -} - -pub struct TlsStream { - state: TlsStreamState, - remote_addr: SocketAddr, -} - -impl TlsStream { - pub fn remote_addr(&self) -> SocketAddr { - self.remote_addr - } -} - -macro_rules! forward_stream_method { - ($self:expr, $cx:expr, $method:ident $(, $param:expr)*) => {{ - let this = $self.get_mut(); - - loop { - match this.state { - TlsStreamState::Handshaking(ref mut accept_fut) => { - let fut = accept_fut.as_mut(); - match ready!(fut.poll($cx)) { - Ok(stream) => { - this.state = TlsStreamState::Streaming(stream); - } - Err(err) => { - let err = io::Error::new(io::ErrorKind::Other, format!("tls handshake: {}", err)); - return Poll::Ready(Err(err)); - } - } - } - TlsStreamState::Streaming(ref mut stream) => { - return Pin::new(stream).$method($cx, $($param),*); - } - } - } - }}; -} - -impl AsyncRead for TlsStream { - fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - forward_stream_method!(self, cx, poll_read, buf) - } -} - -impl AsyncWrite for TlsStream { - fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - forward_stream_method!(self, cx, poll_write, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let this = self.get_mut(); - match this.state { - TlsStreamState::Handshaking(..) => Poll::Ready(Ok(())), - TlsStreamState::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx), - } - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let this = self.get_mut(); - match this.state { - TlsStreamState::Handshaking(..) => Poll::Ready(Ok(())), - TlsStreamState::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx), - } - } -} diff --git a/crates/shadowsocks-service/src/local/http/http_tls/rustls.rs b/crates/shadowsocks-service/src/local/http/http_tls/rustls.rs deleted file mode 100644 index 5e3fb39085bb..000000000000 --- a/crates/shadowsocks-service/src/local/http/http_tls/rustls.rs +++ /dev/null @@ -1,82 +0,0 @@ -//! TLS support by [rustls](https://crates.io/crates/rustls) - -use std::{ - future::Future, - io, - net::SocketAddr, - pin::Pin, - task::{self, Poll}, -}; - -use futures::ready; -use hyper::server::conn::AddrStream; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - -enum TlsStreamState { - Handshaking(tokio_rustls::Accept), - Streaming(tokio_rustls::server::TlsStream), -} - -pub struct TlsStream { - state: TlsStreamState, - remote_addr: SocketAddr, -} - -impl TlsStream { - pub fn remote_addr(&self) -> SocketAddr { - self.remote_addr - } -} - -macro_rules! forward_stream_method { - ($self:expr, $cx:expr, $method:ident $(, $param:expr)*) => {{ - let this = $self.get_mut(); - - loop { - match this.state { - TlsStreamState::Handshaking(ref mut accept_fut) => { - match ready!(Pin::new(accept_fut).poll($cx)) { - Ok(stream) => { - this.state = TlsStreamState::Streaming(stream); - } - Err(err) => { - let err = io::Error::new(io::ErrorKind::Other, format!("tls handshake: {}", err)); - return Poll::Ready(Err(err)); - } - } - } - TlsStreamState::Streaming(ref mut stream) => { - return Pin::new(stream).$method($cx, $($param),*); - } - } - } - }}; -} - -impl AsyncRead for TlsStream { - fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - forward_stream_method!(self, cx, poll_read, buf) - } -} - -impl AsyncWrite for TlsStream { - fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - forward_stream_method!(self, cx, poll_write, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let this = self.get_mut(); - match this.state { - TlsStreamState::Handshaking(..) => Poll::Ready(Ok(())), - TlsStreamState::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx), - } - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let this = self.get_mut(); - match this.state { - TlsStreamState::Handshaking(..) => Poll::Ready(Ok(())), - TlsStreamState::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx), - } - } -} diff --git a/crates/shadowsocks-service/src/local/http/mod.rs b/crates/shadowsocks-service/src/local/http/mod.rs index be38ff69d3e8..564742f79b24 100644 --- a/crates/shadowsocks-service/src/local/http/mod.rs +++ b/crates/shadowsocks-service/src/local/http/mod.rs @@ -1,12 +1,12 @@ -//! Shadowsocks HTTP Local Server +//! Shadowsocks Local HTTP proxy server +//! +//! https://www.ietf.org/rfc/rfc2068.txt -pub use self::server::{Http, HttpBuilder}; +pub use self::server::{Http, HttpBuilder, HttpConnectionHandler}; -mod client_cache; -mod connector; -mod dispatcher; mod http_client; +mod http_service; mod http_stream; -mod http_tls; -mod server; +pub mod server; +mod tokio_rt; mod utils; diff --git a/crates/shadowsocks-service/src/local/http/server.rs b/crates/shadowsocks-service/src/local/http/server.rs index bf975609cfab..eee5a7d65476 100644 --- a/crates/shadowsocks-service/src/local/http/server.rs +++ b/crates/shadowsocks-service/src/local/http/server.rs @@ -1,32 +1,22 @@ -//! Shadowsocks Local HTTP(S) Server +//! Shadowsocks Local HTTP proxy server +//! +//! https://www.ietf.org/rfc/rfc2068.txt -use std::{ - convert::Infallible, - io::{self, ErrorKind}, - net::SocketAddr, - sync::Arc, -}; +use std::{io, net::SocketAddr, sync::Arc, time::Duration}; -use hyper::{ - server::conn::AddrStream, - service::{make_service_fn, service_fn}, - Body, - Client, - Request, - Server, -}; -use log::{error, info}; +use hyper::{server::conn::http1, service}; +use log::{error, info, trace}; use shadowsocks::{config::ServerAddr, net::TcpListener}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + time, +}; use crate::local::{ - context::ServiceContext, - http::connector::Connector, - loadbalancing::PingBalancer, - net::tcp::listener::create_standard_tcp_listener, - LOCAL_DEFAULT_KEEPALIVE_TIMEOUT, + context::ServiceContext, loadbalancing::PingBalancer, net::tcp::listener::create_standard_tcp_listener, }; -use super::{client_cache::ProxyClientCache, dispatcher::HttpDispatcher}; +use super::{http_client::HttpClient, http_service::HttpService, tokio_rt::TokioIo}; /// HTTP Local server builder pub struct HttpBuilder { @@ -83,11 +73,10 @@ impl HttpBuilder { } } - let proxy_client_cache = Arc::new(ProxyClientCache::new(self.context.clone())); + // let proxy_client_cache = Arc::new(ProxyClientCache::new(self.context.clone())); Ok(Http { context: self.context, - proxy_client_cache, listener, balancer: self.balancer, }) @@ -97,7 +86,6 @@ impl HttpBuilder { /// HTTP Local server pub struct Http { context: Arc, - proxy_client_cache: Arc, listener: TcpListener, balancer: PingBalancer, } @@ -110,72 +98,82 @@ impl Http { /// Run server pub async fn run(self) -> io::Result<()> { - let bypass_client = Client::builder() - .http1_preserve_header_case(true) - .http1_title_case_headers(true) - .build::<_, Body>(Connector::new(self.context.clone(), None)); - - let context = self.context.clone(); - let proxy_client_cache = self.proxy_client_cache.clone(); - let balancer = self.balancer; - let make_service = make_service_fn(|socket: &AddrStream| { - let client_addr = socket.remote_addr(); - let balancer = balancer.clone(); - let bypass_client = bypass_client.clone(); - let context = context.clone(); - let proxy_client_cache = proxy_client_cache.clone(); - - async move { - Ok::<_, Infallible>(service_fn(move |req: Request| { - HttpDispatcher::new( - context.clone(), - req, - balancer.clone(), - client_addr, - bypass_client.clone(), - proxy_client_cache.clone(), - ) - .dispatch() - })) - } - }); + // https://www.ietf.org/rfc/rfc2068.txt + // HTTP Proxy is based on HTTP/1.1 + + info!( + "shadowsocks HTTP listening on {}", + self.listener.local_addr().expect("http local_addr") + ); - let server = { - let listener = self.listener.into_inner().into_std()?; - let builder = match Server::from_tcp(listener) { - Ok(builder) => builder, + let handler = HttpConnectionHandler::new(self.context, self.balancer); + + loop { + let (stream, peer_addr) = match self.listener.accept().await { + Ok(s) => s, Err(err) => { - error!("hyper server from std::net::TcpListener error: {}", err); - let err = io::Error::new(ErrorKind::InvalidInput, err); - return Err(err); + error!("failed to accept HTTP clients, err: {}", err); + time::sleep(Duration::from_secs(1)).await; + continue; } }; - builder - .http1_only(true) // HTTP Proxy protocol only defined in HTTP 1.x - .http1_preserve_header_case(true) - .http1_title_case_headers(true) - .tcp_sleep_on_accept_errors(true) - .tcp_keepalive( - self.context - .accept_opts() - .tcp - .keepalive - .or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT)), - ) - .tcp_nodelay(self.context.accept_opts().tcp.nodelay) - .serve(make_service) - }; - - info!("shadowsocks HTTP listening on {}", server.local_addr()); - - if let Err(err) = server.await { - use std::io::Error; - - error!("hyper server exited with error: {}", err); - return Err(Error::new(ErrorKind::Other, err)); + trace!("HTTP accepted client from {}", peer_addr); + tokio::spawn(handler.clone().serve_connection(stream, peer_addr)); + } + } +} + +/// HTTP Proxy handler for `accept()`ed HTTP clients +/// +/// It should be created once and then `clone()` for every individual TCP connections +#[derive(Clone)] +pub struct HttpConnectionHandler { + context: Arc, + balancer: PingBalancer, + http_client: HttpClient, +} + +impl HttpConnectionHandler { + /// Create a new Handler + pub fn new(context: Arc, balancer: PingBalancer) -> HttpConnectionHandler { + HttpConnectionHandler { + context, + balancer, + http_client: HttpClient::new(), } + } - Ok(()) + /// Handle a TCP HTTP connection + pub async fn serve_connection(self, stream: S, peer_addr: SocketAddr) + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let HttpConnectionHandler { + context, + balancer, + http_client, + } = self; + + let io = TokioIo::new(stream); + + // NOTE: Some stupid clients requires HTTP header keys to be case-sensitive. + // For example: Nintendo Switch + if let Err(err) = http1::Builder::new() + .keep_alive(true) + .title_case_headers(true) + .preserve_header_case(true) + .serve_connection( + io, + service::service_fn(move |req| { + HttpService::new(context.clone(), peer_addr, http_client.clone(), balancer.clone()) + .serve_connection(req) + }), + ) + .with_upgrades() + .await + { + error!("failed to serve HTTP connection, error: {}", err); + } } } diff --git a/crates/shadowsocks-service/src/local/http/tokio_rt.rs b/crates/shadowsocks-service/src/local/http/tokio_rt.rs new file mode 100644 index 000000000000..cdb86d3f78d2 --- /dev/null +++ b/crates/shadowsocks-service/src/local/http/tokio_rt.rs @@ -0,0 +1,151 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project::pin_project; + +#[derive(Debug)] +#[pin_project] +pub struct TokioIo { + #[pin] + inner: T, +} + +impl TokioIo { + pub fn new(inner: T) -> Self { + Self { inner } + } + + // pub fn inner(self) -> T { + // self.inner + // } +} + +impl hyper::rt::Read for TokioIo +where + T: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for TokioIo +where + T: tokio::io::AsyncWrite, +{ + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +impl tokio::io::AsyncRead for TokioIo +where + T: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + // let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl tokio::io::AsyncWrite for TokioIo +where + T: hyper::rt::Write, +{ + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +#[derive(Clone)] +pub struct TokioExecutor; + +impl hyper::rt::Executor for TokioExecutor +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + fn execute(&self, future: F) { + tokio::spawn(future); + } +} diff --git a/crates/shadowsocks-service/src/local/http/utils.rs b/crates/shadowsocks-service/src/local/http/utils.rs index 694636e06a9c..32ae7b908cc4 100644 --- a/crates/shadowsocks-service/src/local/http/utils.rs +++ b/crates/shadowsocks-service/src/local/http/utils.rs @@ -1,10 +1,27 @@ //! HTTP Utilities -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::{ + io, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + sync::Arc, +}; -use hyper::{http::uri::Authority, Uri}; +use hyper::{ + header::{self, HeaderValue}, + http::uri::Authority, + HeaderMap, + Uri, + Version, +}; +use log::error; use shadowsocks::relay::socks5::Address; +use crate::local::{ + context::ServiceContext, + loadbalancing::{PingBalancer, ServerIdent}, + net::AutoProxyClientStream, +}; + pub fn authority_addr(scheme_str: Option<&str>, authority: &Authority) -> Option
{ // RFC7230 indicates that we should ignore userinfo // https://tools.ietf.org/html/rfc7230#section-5.3.3 @@ -53,3 +70,78 @@ pub fn host_addr(uri: &Uri) -> Option
{ Some(authority) => authority_addr(uri.scheme_str(), authority), } } + +fn get_keep_alive_val(values: header::GetAll) -> Option { + let mut conn_keep_alive = None; + for value in values { + if let Ok(value) = value.to_str() { + if value.eq_ignore_ascii_case("close") { + conn_keep_alive = Some(false); + } else { + for part in value.split(',') { + let part = part.trim(); + if part.eq_ignore_ascii_case("keep-alive") { + conn_keep_alive = Some(true); + break; + } + } + } + } + } + conn_keep_alive +} + +pub fn check_keep_alive(version: Version, headers: &HeaderMap, check_proxy: bool) -> bool { + // HTTP/1.1, HTTP/2, HTTP/3 keeps alive by default + let mut conn_keep_alive = !matches!(version, Version::HTTP_09 | Version::HTTP_10); + + if check_proxy { + // Modern browsers will send Proxy-Connection instead of Connection + // for HTTP/1.0 proxies which blindly forward Connection to remote + // + // https://tools.ietf.org/html/rfc7230#appendix-A.1.2 + if let Some(b) = get_keep_alive_val(headers.get_all("Proxy-Connection")) { + conn_keep_alive = b + } + } + + // Connection will replace Proxy-Connection + // + // But why client sent both Connection and Proxy-Connection? That's not standard! + if let Some(b) = get_keep_alive_val(headers.get_all("Connection")) { + conn_keep_alive = b + } + + conn_keep_alive +} + +pub async fn connect_host( + context: Arc, + host: &Address, + balancer: &PingBalancer, +) -> io::Result<(AutoProxyClientStream, Option>)> { + if balancer.is_empty() { + match AutoProxyClientStream::connect_bypassed(context, host).await { + Ok(s) => Ok((s, None)), + Err(err) => { + error!("failed to connect host {} bypassed, err: {}", host, err); + Err(err) + } + } + } else { + let server = balancer.best_tcp_server(); + + match AutoProxyClientStream::connect(context, server.as_ref(), host).await { + Ok(s) => Ok((s, Some(server))), + Err(err) => { + error!( + "failed to connect host {} proxied, svr_cfg: {}, error: {}", + host, + server.server_config().addr(), + err + ); + Err(err) + } + } + } +}