Skip to content

Commit

Permalink
add tls handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rucciva committed Oct 17, 2024
1 parent 4253650 commit 8db9283
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 148 deletions.
34 changes: 9 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ http2-native-tls = [
"http-body-util",
"futures-core",
"tokio-util",
"hyper-tls",
"tower-service",
"native-tls",
]
http2-rustls = [
Expand All @@ -72,7 +72,7 @@ http2-rustls = [
"http-body-util",
"futures-core",
"tokio-util",
"hyper-tls",
"tower-service",
"rustls",
]

Expand Down Expand Up @@ -144,7 +144,7 @@ http = { version = "1.1.0", optional = true }
hyper = { version = "1.4.1", optional = true , features = ["client","server","http2"] }
hyper-util = { version = "0.1.9", optional = true , features = ["full"]}
http-body-util = { version = "0.1.2", optional = true }
hyper-tls = { version = "0.6.0", optional = true }
tower-service = { version = "0.3.3", optional = true }
tokio-util = { version = "0.7.9", optional = true, features = ["io"] }
futures-core = { version = "0.3.28", optional = true }
futures-sink = { version = "0.3.28", optional = true }
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ heartbeat_timeout = 40 # Optional. Set to 0 to disable the application-layer hea
retry_interval = 1 # Optional. The interval between retry to connect to the server. Default: 1 second

[client.transport] # The whole block is optional. Specify which transport to use
type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise"]. Default: "tcp"
type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise", "websocket", "http2"]. Default: "tcp"

[client.transport.tcp] # Optional. Also affects `noise` and `tls`
proxy = "socks5://user:[email protected]:1080" # Optional. The proxy used to connect to the server. `http` and `socks5` is supported.
Expand All @@ -131,6 +131,9 @@ remote_public_key = "key_encoded_in_base64" # Optional
[client.transport.websocket] # Necessary if `type` is "websocket"
tls = true # If `true` then it will use settings in `client.transport.tls`

[client.transport.http2] # Necessary if `type` is "http2"
tls = true # If `true` then it will use settings in `client.transport.tls`

[client.services.service1] # A service that needs forwarding. The name `service1` can change arbitrarily, as long as identical to the name in the server's configuration
type = "tcp" # Optional. The protocol that needs forwarding. Possible values: ["tcp", "udp"]. Default: "tcp"
token = "whatever" # Necessary if `client.default_token` not set
Expand Down Expand Up @@ -166,6 +169,9 @@ remote_public_key = "key_encoded_in_base64"
[server.transport.websocket] # Necessary if `type` is "websocket"
tls = true # If `true` then it will use settings in `server.transport.tls`

[server.transport.http2] # Necessary if `type` is "http2"
tls = true # If `true` then it will use settings in `server.transport.tls`

[server.services.service1] # The service name must be identical to the client side
type = "tcp" # Optional. Same as the client `[client.services.X.type]
token = "whatever" # Necessary if `server.default_token` not set
Expand Down
103 changes: 70 additions & 33 deletions src/transport/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@ use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{self, Context, Poll};

use anyhow::anyhow;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use futures_core::Stream;
use http::{Method, Request, Response};
use http::{Method, Request, Response, Uri};
use http_body_util::StreamBody;
use hyper::body::{Body, Incoming};
use hyper::server::conn::http2 as Server;
use hyper::service::Service;
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::connect::{Connected, Connection};
use hyper_util::client::legacy::Client;
use hyper_util::rt::tokio::TokioExecutor;
use hyper_util::rt::tokio::TokioIo;
use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, SimplexStream, WriteHalf};
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::{broadcast, mpsc};
use tokio_util::io::ReaderStream;

use super::{AddrMaybeCached, SocketOpts, TcpTransport, TlsTransport, Transport};
use super::maybe_tls::{MaybeTLSStream, MaybeTLSTransport};
use super::{AddrMaybeCached, SocketOpts, Transport};
use crate::config::{HTTP2Config, TransportConfig};

#[derive(Debug)]
Expand Down Expand Up @@ -170,25 +170,9 @@ impl Service<Request<Incoming>> for Svc {
}
}

#[derive(Debug)]
enum SubTransport {
Secure(TlsTransport),
Insecure(TcpTransport),
}

impl SubTransport {
async fn accept(&self, a: &TcpListener) -> anyhow::Result<(TcpStream, SocketAddr)> {
match self {
SubTransport::Insecure(ref t) => t.accept(a),
SubTransport::Secure(ref t) => t.accept(a),
}
.await
}
}

async fn start_http_server(
listener: TcpListener,
transport: Arc<SubTransport>,
transport: Arc<MaybeTLSTransport>,
req_sender: mpsc::Sender<anyhow::Result<(SocketAddr, Incoming, mpsc::Sender<OutgoingSimplex>)>>,
stop_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
Expand All @@ -199,7 +183,10 @@ async fn start_http_server(
return Ok(());
}

conn = transport.as_ref().accept(&listener) => {
conn = match transport.as_ref() {
MaybeTLSTransport::No(t) => t.accept(&listener),
MaybeTLSTransport::Yes(t) => t.accept(&listener),
} => {
if let Err(err) = conn {
if let Err(err)= req_sender.send(Err(err)).await {
eprintln!("Error sending error message: {}", err);
Expand Down Expand Up @@ -236,12 +223,63 @@ async fn start_http_server(
}
}

impl Connection for MaybeTLSStream {
fn connected(&self) -> Connected {
let connected = Connected::new();
if let (Ok(remote_addr), Ok(local_addr)) = (
self.get_tcpstream().peer_addr(),
self.get_tcpstream().local_addr(),
) {
connected.extra((remote_addr, local_addr))
} else {
connected
}
}
}

#[derive(Clone)]
struct MaybeTLSConnector {
sub: Arc<MaybeTLSTransport>,
}

impl tower_service::Service<Uri> for MaybeTLSConnector {
type Response = TokioIo<MaybeTLSStream>;
type Error = anyhow::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, u: Uri) -> Self::Future {
let sub = self.sub.clone();
let future = async move {
let addr = match (u.host(), u.port()) {
(Some(host), Some(port)) => format!("{}:{}", host, port),
_ => String::from(""),
};
let addr = AddrMaybeCached::new(addr.as_str());
match sub.as_ref() {
MaybeTLSTransport::No(t) => match t.connect(&addr).await {
Err(err) => Err(err),
Ok(s) => Ok(TokioIo::new(MaybeTLSStream::No(s))),
},
MaybeTLSTransport::Yes(t) => match t.connect(&addr).await {
Err(err) => Err(err),
Ok(s) => Ok(TokioIo::new(MaybeTLSStream::Yes(s))),
},
}
};
Box::pin(future)
}
}

#[derive(Debug)]
pub struct HTTP2Transport {
cfg: HTTP2Config,

server_transport: Arc<SubTransport>,
client: Client<HttpsConnector<HttpConnector>, StreamBody<OutgoingSimplex>>,
sub_transport: Arc<MaybeTLSTransport>,
client: Client<MaybeTLSConnector, StreamBody<OutgoingSimplex>>,

stop_sender: broadcast::Sender<()>,
stop_receiver: broadcast::Receiver<()>,
Expand All @@ -264,17 +302,16 @@ impl Transport for HTTP2Transport {
.ok_or_else(|| anyhow!("Missing http2 config"))?;

let (stop_sender, stop_receiver) = broadcast::channel(1);
let server_transport = Arc::new(match cfg.tls {
true => SubTransport::Secure(TlsTransport::new(config)?),
false => SubTransport::Insecure(TcpTransport::new(config)?),
});
let sub_transport = Arc::new(MaybeTLSTransport::new(cfg.tls, config)?);
let client = Client::builder(TokioExecutor::new())
.http2_only(true)
.build(HttpsConnector::new());
.build(MaybeTLSConnector {
sub: sub_transport.clone(),
});

Ok(HTTP2Transport {
cfg: cfg.clone(),
server_transport,
sub_transport,
client,
stop_sender,
stop_receiver,
Expand All @@ -293,7 +330,7 @@ impl Transport for HTTP2Transport {
anyhow::Result<(SocketAddr, Incoming, mpsc::Sender<OutgoingSimplex>)>,
>(1);
let req_receiver = Arc::new(Mutex::new(req_receiver));
let sub_transport = self.server_transport.clone();
let sub_transport = self.sub_transport.clone();
let stop_receiver = self.stop_receiver.resubscribe();
tokio::spawn(start_http_server(
listener,
Expand Down
Loading

0 comments on commit 8db9283

Please sign in to comment.