Skip to content

Commit

Permalink
Fix load balancer to add per server fwmark for outbound traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
stormynoct authored and zonyitoo committed Apr 14, 2024
1 parent 7b4a0a1 commit a5d71a2
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use tokio::{
time,
};

use crate::config::ServerInstanceConfig;
use crate::local::context::ServiceContext;

use super::{
Expand Down Expand Up @@ -82,8 +83,9 @@ impl PingBalancerBuilder {
}
}

pub fn add_server(&mut self, server: ServerConfig) {
pub fn add_server(&mut self, server: ServerInstanceConfig) {
let ident = ServerIdent::new(
self.context.clone(),
server,
self.max_server_rtt,
self.check_interval * EXPECTED_CHECK_POINTS_IN_CHECK_WINDOW,
Expand Down Expand Up @@ -720,13 +722,14 @@ impl PingBalancer {
}

/// Reset servers in load balancer. Designed for auto-reloading configuration file.
pub async fn reset_servers(&self, servers: Vec<ServerConfig>) -> io::Result<()> {
pub async fn reset_servers(&self, servers: Vec<ServerInstanceConfig>) -> io::Result<()> {
let old_context = self.inner.context.load();

let servers = servers
.into_iter()
.map(|s| {
Arc::new(ServerIdent::new(
old_context.context.clone(),
s,
old_context.max_server_rtt,
old_context.check_interval * EXPECTED_CHECK_POINTS_IN_CHECK_WINDOW,
Expand Down Expand Up @@ -811,7 +814,7 @@ impl PingChecker {
self.context.context(),
self.server.server_config(),
&addr,
self.context.connect_opts_ref(),
self.server.connect_opts_ref(),
)
.await?;
stream.write_all(GET_BODY).await?;
Expand Down Expand Up @@ -851,7 +854,7 @@ impl PingChecker {
self.context.context(),
self.server.server_config(),
&addr,
self.context.connect_opts_ref(),
self.server.connect_opts_ref(),
)
.await?;
stream.write_all(GET_BODY).await?;
Expand Down Expand Up @@ -896,10 +899,12 @@ impl PingChecker {

let addr = Address::SocketAddress(SocketAddr::new(Ipv4Addr::new(8, 8, 8, 8).into(), 53));

let svr_cfg = self.server.server_config();

let client =
ProxySocket::connect_with_opts(self.context.context(), svr_cfg, self.context.connect_opts_ref()).await?;
let client = ProxySocket::connect_with_opts(
self.context.context(),
self.server.server_config(),
self.server.connect_opts_ref(),
)
.await?;

let mut control = UdpSocketControlData::default();
control.client_session_id = rand::random::<u64>();
Expand Down
45 changes: 37 additions & 8 deletions crates/shadowsocks-service/src/local/loadbalancing/server_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@
use std::{
fmt::{self, Debug},
sync::atomic::{AtomicU32, Ordering},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

use shadowsocks::ServerConfig;
use shadowsocks::{
net::ConnectOpts,
ServerConfig,
};
use tokio::sync::Mutex;

use crate::{
config::ServerInstanceConfig,
local::context::ServiceContext,
};

use super::server_stat::{Score, ServerStat};

/// Server's statistic score
Expand Down Expand Up @@ -61,25 +72,43 @@ impl Debug for ServerScore {
pub struct ServerIdent {
tcp_score: ServerScore,
udp_score: ServerScore,
svr_cfg: ServerConfig,
svr_cfg: ServerInstanceConfig,
connect_opts: ConnectOpts,
}

impl ServerIdent {
/// Create a `ServerIdent`
pub fn new(svr_cfg: ServerConfig, max_server_rtt: Duration, check_window: Duration) -> ServerIdent {
pub fn new(
context: Arc<ServiceContext>,
svr_cfg: ServerInstanceConfig,
max_server_rtt: Duration,
check_window: Duration
) -> ServerIdent {
let mut connect_opts = context.connect_opts_ref().clone();

Check warning on line 87 in crates/shadowsocks-service/src/local/loadbalancing/server_data.rs

View workflow job for this annotation

GitHub Actions / clippy-check (macos-latest)

variable does not need to be mutable

warning: variable does not need to be mutable --> crates/shadowsocks-service/src/local/loadbalancing/server_data.rs:87:13 | 87 | let mut connect_opts = context.connect_opts_ref().clone(); | ----^^^^^^^^^^^^ | | | help: remove this `mut` | = note: `#[warn(unused_mut)]` on by default

#[cfg(any(target_os = "linux", target_os = "android"))]
if let Some(fwmark) = svr_cfg.outbound_fwmark {
connect_opts.fwmark = Some(fwmark);
}

ServerIdent {
tcp_score: ServerScore::new(svr_cfg.weight().tcp_weight(), max_server_rtt, check_window),
udp_score: ServerScore::new(svr_cfg.weight().udp_weight(), max_server_rtt, check_window),
tcp_score: ServerScore::new(svr_cfg.config.weight().tcp_weight(), max_server_rtt, check_window),
udp_score: ServerScore::new(svr_cfg.config.weight().udp_weight(), max_server_rtt, check_window),
svr_cfg,
connect_opts,
}
}

pub fn connect_opts_ref(&self) -> &ConnectOpts {
&self.connect_opts
}

pub fn server_config(&self) -> &ServerConfig {
&self.svr_cfg
&self.svr_cfg.config
}

pub fn server_config_mut(&mut self) -> &mut ServerConfig {
&mut self.svr_cfg
&mut self.svr_cfg.config
}

pub fn tcp_score(&self) -> &ServerScore {
Expand Down
2 changes: 1 addition & 1 deletion crates/shadowsocks-service/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl Server {
}

for server in config.server {
balancer_builder.add_server(server.config);
balancer_builder.add_server(server);
}

balancer_builder.build().await?
Expand Down
5 changes: 2 additions & 3 deletions src/service/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,10 +991,9 @@ fn launch_reload_server_task(config_path: PathBuf, balancer: PingBalancer) {
}
};

let servers: Vec<ServerConfig> = config.server.into_iter().map(|s| s.config).collect();
info!("auto-reload {} with {} servers", config_path.display(), servers.len());
info!("auto-reload {} with {} servers", config_path.display(), config.server.len());

if let Err(err) = balancer.reset_servers(servers).await {
if let Err(err) = balancer.reset_servers(config.server).await {
error!("auto-reload {} but found error: {}", config_path.display(), err);
}
}
Expand Down

0 comments on commit a5d71a2

Please sign in to comment.