Skip to content

Commit

Permalink
feat(local-online-config): refactored with local-http client
Browse files Browse the repository at this point in the history
- removed dependency to reqwest (very large)
- apply outbound configurations on sockets
- fixed #1508
  • Loading branch information
zonyitoo committed Jun 11, 2024
1 parent e981e7e commit 87c1557
Show file tree
Hide file tree
Showing 16 changed files with 519 additions and 339 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ service = ["local", "server", "manager"]
winservice = ["service", "windows-service"]

# Enables Hickory-DNS for replacing tokio's builtin DNS resolver
hickory-dns = ["shadowsocks-service/hickory-dns", "reqwest/hickory-dns"]
hickory-dns = ["shadowsocks-service/hickory-dns"]
# Hickory-DNS was renamed from Trust-DNS, keep compatibility.
trust-dns = ["hickory-dns"]
dns-over-tls = ["shadowsocks-service/dns-over-tls"]
Expand Down Expand Up @@ -166,7 +166,6 @@ local-fake-dns = ["local", "shadowsocks-service/local-fake-dns", "ipnet"]
# https://shadowsocks.org/doc/sip008.html
local-online-config = [
"local",
"reqwest",
"mime",
"shadowsocks-service/local-online-config",
]
Expand Down
4 changes: 3 additions & 1 deletion crates/shadowsocks-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ local-tun = ["local", "etherparse", "tun2", "smoltcp"]
local-fake-dns = ["local", "trust-dns", "sled", "bson"]
# sslocal support online URL (SIP008 Online Configuration Delivery)
# https://shadowsocks.org/doc/sip008.html
local-online-config = ["local"]
local-online-config = ["local", "local-http", "mime", "http"]

# Enable Stream Cipher Protocol
# WARN: Stream Cipher Protocol is proved to be insecure
Expand Down Expand Up @@ -157,6 +157,7 @@ libc = "0.2.141"

hyper = { version = "1.3", optional = true, features = ["full"] }
http-body-util = { version = "0.1", optional = true }
http = { version = "1.1", optional = true }

hickory-resolver = { version = "0.24", optional = true, features = [
"serde-config",
Expand All @@ -166,6 +167,7 @@ idna = "1.0"
ipnet = "2.9"
iprange = "0.6"
regex = "1.4"
mime = { version = "0.3", optional = true }

tun2 = { version = "1", optional = true, features = ["async"] }
etherparse = { version = "0.15", optional = true }
Expand Down
43 changes: 43 additions & 0 deletions crates/shadowsocks-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ struct SSConfig {
#[cfg(feature = "local-online-config")]
#[serde(skip_serializing_if = "Option::is_none")]
version: Option<u32>,

#[cfg(feature = "local-online-config")]
#[serde(skip_serializing_if = "Option::is_none")]
online_config: Option<SSOnlineConfig>,
}

#[derive(Serialize, Deserialize, Debug, Default)]
Expand Down Expand Up @@ -406,6 +410,13 @@ struct SSServerExtConfig {
outbound_bind_interface: Option<String>,
}

#[cfg(feature = "local-online-config")]
#[derive(Serialize, Deserialize, Debug, Default)]
struct SSOnlineConfig {
config_url: String,
update_interval: Option<u64>,
}

/// Server config type
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ConfigType {
Expand Down Expand Up @@ -1237,6 +1248,17 @@ impl LocalInstanceConfig {
}
}

/// OnlineConfiguration (SIP008)
/// https://shadowsocks.org/doc/sip008.html
#[cfg(feature = "local-online-config")]
#[derive(Debug, Clone)]
pub struct OnlineConfig {
/// SIP008 URL
pub config_url: String,
/// Update interval, 3600s by default
pub update_interval: Option<Duration>,
}

/// Configuration
#[derive(Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -1341,6 +1363,10 @@ pub struct Config {
/// Workers in runtime
/// It should be replaced with metrics APIs: https://github.com/tokio-rs/tokio/issues/4073
pub worker_count: usize,

/// OnlineConfiguration (SIP008)
/// https://shadowsocks.org/doc/sip008.html
pub online_config: Option<OnlineConfig>,
}

/// Configuration parsing error kind
Expand Down Expand Up @@ -1462,6 +1488,8 @@ impl Config {
config_path: None,

worker_count: 1,

online_config: None,
}
}

Expand Down Expand Up @@ -2352,6 +2380,13 @@ impl Config {
nconfig.acl = Some(acl);
}

if let Some(online_config) = config.online_config {
nconfig.online_config = Some(OnlineConfig {
config_url: online_config.config_url,
update_interval: online_config.update_interval.map(Duration::from_secs),
});
}

Ok(nconfig)
}

Expand Down Expand Up @@ -3090,6 +3125,14 @@ impl fmt::Display for Config {
jconf.acl = Some(acl.file_path().to_str().unwrap().to_owned());
}

// OnlineConfig
if let Some(ref online_config) = self.online_config {
jconf.online_config = Some(SSOnlineConfig {
config_url: online_config.config_url.clone(),
update_interval: online_config.update_interval.as_ref().map(Duration::as_secs),
});
}

write!(f, "{}", json5::to_string(&jconf).unwrap())
}
}
Expand Down
118 changes: 96 additions & 22 deletions crates/shadowsocks-service/src/local/http/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@
use std::{
collections::VecDeque,
fmt::Debug,
future::Future,
io::{self, ErrorKind},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};

use hyper::rt::{Sleep, Timer};
use hyper::{
body,
body::{self, Body},
client::conn::{http1, http2},
http::uri::Scheme,
Request,
Response,
Request, Response,
};
use log::{error, trace};
use lru_time_cache::LruCache;
use pin_project::pin_project;
use shadowsocks::relay::Address;
use tokio::sync::Mutex;

Expand All @@ -29,33 +34,96 @@ use super::{

const CONNECTION_EXPIRE_DURATION: Duration = Duration::from_secs(20);

/// HTTPClient API request errors
#[derive(thiserror::Error, Debug)]
pub enum HttpClientError {
/// Errors from hyper
#[error("{0}")]
Hyper(#[from] hyper::Error),
/// std::io::Error
#[error("{0}")]
Io(#[from] io::Error),
}

#[derive(Clone)]
pub struct HttpClient {
#[derive(Clone, Debug)]
pub struct TokioTimer;

impl Timer for TokioTimer {
fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
Box::pin(TokioSleep {
inner: tokio::time::sleep(duration),
})
}

fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
Box::pin(TokioSleep {
inner: tokio::time::sleep_until(deadline.into()),
})
}

fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
sleep.reset(new_deadline.into())

Check warning on line 66 in crates/shadowsocks-service/src/local/http/http_client.rs

View workflow job for this annotation

GitHub Actions / clippy macos-latest

useless conversion to the same type: `std::time::Instant`

warning: useless conversion to the same type: `std::time::Instant` --> crates/shadowsocks-service/src/local/http/http_client.rs:66:25 | 66 | sleep.reset(new_deadline.into()) | ^^^^^^^^^^^^^^^^^^^ help: consider removing `.into()`: `new_deadline` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `#[warn(clippy::useless_conversion)]` on by default

Check warning on line 66 in crates/shadowsocks-service/src/local/http/http_client.rs

View workflow job for this annotation

GitHub Actions / clippy ubuntu-latest

useless conversion to the same type: `std::time::Instant`

warning: useless conversion to the same type: `std::time::Instant` --> crates/shadowsocks-service/src/local/http/http_client.rs:66:25 | 66 | sleep.reset(new_deadline.into()) | ^^^^^^^^^^^^^^^^^^^ help: consider removing `.into()`: `new_deadline` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `#[warn(clippy::useless_conversion)]` on by default
}
}
}

#[pin_project]
pub(crate) struct TokioSleep {
#[pin]
pub(crate) inner: tokio::time::Sleep,
}

impl Future for TokioSleep {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

impl Sleep for TokioSleep {}

impl TokioSleep {
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
self.project().inner.as_mut().reset(deadline.into());
}
}

/// HTTPClient, supporting HTTP/1.1 and H2, HTTPS.
pub struct HttpClient<B> {
#[allow(clippy::type_complexity)]
cache_conn: Arc<Mutex<LruCache<Address, VecDeque<(HttpConnection, Instant)>>>>,
cache_conn: Arc<Mutex<LruCache<Address, VecDeque<(HttpConnection<B>, Instant)>>>>,
}

impl<B> Clone for HttpClient<B> {
fn clone(&self) -> Self {
HttpClient {
cache_conn: self.cache_conn.clone(),
}
}
}

impl HttpClient {
pub fn new() -> HttpClient {
impl<B> HttpClient<B>
where
B: Body + Send + Unpin + Debug + 'static,
B::Data: Send,
B::Error: Into<Box<dyn ::std::error::Error + Send + Sync>>,
{
/// Create a new HttpClient
pub fn new() -> HttpClient<B> {
HttpClient {
cache_conn: Arc::new(Mutex::new(LruCache::with_expiry_duration(CONNECTION_EXPIRE_DURATION))),
}
}

Check warning on line 118 in crates/shadowsocks-service/src/local/http/http_client.rs

View workflow job for this annotation

GitHub Actions / clippy macos-latest

you should consider adding a `Default` implementation for `HttpClient<B>`

warning: you should consider adding a `Default` implementation for `HttpClient<B>` --> crates/shadowsocks-service/src/local/http/http_client.rs:114:5 | 114 | / pub fn new() -> HttpClient<B> { 115 | | HttpClient { 116 | | cache_conn: Arc::new(Mutex::new(LruCache::with_expiry_duration(CONNECTION_EXPIRE_DURATION))), 117 | | } 118 | | } | |_____^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#new_without_default = note: `#[warn(clippy::new_without_default)]` on by default help: try adding this | 107 + impl<B> Default for HttpClient<B> 108 + where 109 + B: Body + Send + Unpin + Debug + 'static, 110 + B::Data: Send, 111 + B::Error: Into<Box<dyn ::std::error::Error + Send + Sync>>, 112 + { 113 + fn default() -> Self { 114 + Self::new() 115 + } 116 + } |

Check warning on line 118 in crates/shadowsocks-service/src/local/http/http_client.rs

View workflow job for this annotation

GitHub Actions / clippy ubuntu-latest

you should consider adding a `Default` implementation for `HttpClient<B>`

warning: you should consider adding a `Default` implementation for `HttpClient<B>` --> crates/shadowsocks-service/src/local/http/http_client.rs:114:5 | 114 | / pub fn new() -> HttpClient<B> { 115 | | HttpClient { 116 | | cache_conn: Arc::new(Mutex::new(LruCache::with_expiry_duration(CONNECTION_EXPIRE_DURATION))), 117 | | } 118 | | } | |_____^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#new_without_default = note: `#[warn(clippy::new_without_default)]` on by default help: try adding this | 107 + impl<B> Default for HttpClient<B> 108 + where 109 + B: Body + Send + Unpin + Debug + 'static, 110 + B::Data: Send, 111 + B::Error: Into<Box<dyn ::std::error::Error + Send + Sync>>, 112 + { 113 + fn default() -> Self { 114 + Self::new() 115 + } 116 + } |

/// Make HTTP requests
#[inline]
pub async fn send_request(
&self,
context: Arc<ServiceContext>,
req: Request<body::Incoming>,
balancer: &PingBalancer,
req: Request<B>,
balancer: Option<&PingBalancer>,
) -> Result<Response<body::Incoming>, HttpClientError> {
let host = match host_addr(req.uri()) {
Some(h) => h,
Expand Down Expand Up @@ -96,7 +164,7 @@ impl HttpClient {
self.send_request_conn(host, c, req).await.map_err(Into::into)
}

async fn get_cached_connection(&self, host: &Address) -> Option<HttpConnection> {
async fn get_cached_connection(&self, host: &Address) -> Option<HttpConnection<B>> {
if let Some(q) = self.cache_conn.lock().await.get_mut(host) {
while let Some((c, inst)) = q.pop_front() {
let now = Instant::now();
Expand All @@ -115,8 +183,8 @@ impl HttpClient {
async fn send_request_conn(
&self,
host: Address,
mut c: HttpConnection,
req: Request<body::Incoming>,
mut c: HttpConnection<B>,
req: Request<B>,
) -> hyper::Result<Response<body::Incoming>> {
trace!("HTTP making request to host: {}, request: {:?}", host, req);
let response = c.send_request(req).await?;
Expand All @@ -141,19 +209,24 @@ impl HttpClient {
}
}

enum HttpConnection {
Http1(http1::SendRequest<body::Incoming>),
Http2(http2::SendRequest<body::Incoming>),
enum HttpConnection<B> {
Http1(http1::SendRequest<B>),
Http2(http2::SendRequest<B>),
}

impl HttpConnection {
impl<B> HttpConnection<B>
where
B: Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Box<dyn ::std::error::Error + Send + Sync>>,
{
async fn connect(
context: Arc<ServiceContext>,
scheme: &Scheme,
host: Address,
domain: &str,
balancer: &PingBalancer,
) -> io::Result<HttpConnection> {
balancer: Option<&PingBalancer>,
) -> io::Result<HttpConnection<B>> {
if *scheme != Scheme::HTTP && *scheme != Scheme::HTTPS {
return Err(io::Error::new(ErrorKind::InvalidInput, "invalid scheme"));
}
Expand All @@ -173,7 +246,7 @@ impl HttpConnection {
scheme: &Scheme,
host: Address,
stream: AutoProxyClientStream,
) -> io::Result<HttpConnection> {
) -> io::Result<HttpConnection<B>> {
trace!(
"HTTP making new HTTP/1.1 connection to host: {}, scheme: {}",
host,
Expand Down Expand Up @@ -207,7 +280,7 @@ impl HttpConnection {
host: Address,
domain: &str,
stream: AutoProxyClientStream,
) -> io::Result<HttpConnection> {
) -> io::Result<HttpConnection<B>> {
trace!("HTTP making new TLS connection to host: {}, scheme: {}", host, scheme);

// TLS handshake, check alpn for h2 support.
Expand All @@ -216,6 +289,7 @@ impl HttpConnection {
if stream.negotiated_http2() {
// H2 connnection
let (send_request, connection) = match http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(15))
.handshake(TokioIo::new(stream))
.await
Expand Down Expand Up @@ -254,7 +328,7 @@ impl HttpConnection {
}

#[inline]
pub async fn send_request(&mut self, req: Request<body::Incoming>) -> hyper::Result<Response<body::Incoming>> {
pub async fn send_request(&mut self, req: Request<B>) -> hyper::Result<Response<body::Incoming>> {
match self {
HttpConnection::Http1(r) => r.send_request(req).await,
HttpConnection::Http2(r) => r.send_request(req).await,
Expand Down
Loading

0 comments on commit 87c1557

Please sign in to comment.