From 6345c0dcf14e5a659ecc0bf9c34bc642c60dc3b9 Mon Sep 17 00:00:00 2001 From: zonyitoo Date: Thu, 13 Jun 2024 00:00:05 +0800 Subject: [PATCH] feat(server): tokio v1.38 stablized RuntimeMetrics::num_workers --- Cargo.lock | 1 - Cargo.toml | 1 - crates/shadowsocks-service/Cargo.toml | 2 +- crates/shadowsocks-service/src/config.rs | 7 ------- crates/shadowsocks-service/src/manager/server.rs | 14 -------------- crates/shadowsocks-service/src/server/mod.rs | 4 ---- crates/shadowsocks-service/src/server/server.rs | 13 +------------ crates/shadowsocks-service/src/server/udprelay.rs | 11 ++--------- src/service/manager.rs | 5 ----- src/service/server.rs | 5 ----- 10 files changed, 4 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 899120ca2eb8..34323cf3dc44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3184,7 +3184,6 @@ dependencies = [ "log4rs", "mimalloc", "mime", - "num_cpus", "qrcode", "rand", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 9928912025a9..a7fb48c33a72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -239,7 +239,6 @@ rand = "0.8" futures = "0.3" tokio = { version = "1", features = ["rt", "signal"] } -num_cpus = "1.15" ipnet = { version = "2.9", optional = true } diff --git a/crates/shadowsocks-service/Cargo.toml b/crates/shadowsocks-service/Cargo.toml index 8b84a56cb82f..74fcfd851859 100644 --- a/crates/shadowsocks-service/Cargo.toml +++ b/crates/shadowsocks-service/Cargo.toml @@ -144,7 +144,7 @@ rand = { version = "0.8", features = ["small_rng"] } sled = { version = "0.34.7", optional = true } futures = "0.3" -tokio = { version = "1.5", features = [ +tokio = { version = "1.38", features = [ "io-util", "macros", "net", diff --git a/crates/shadowsocks-service/src/config.rs b/crates/shadowsocks-service/src/config.rs index 13026ba8eba8..5301ebdf91d8 100644 --- a/crates/shadowsocks-service/src/config.rs +++ b/crates/shadowsocks-service/src/config.rs @@ -1359,11 +1359,6 @@ pub struct Config { /// This is normally for auto-reloading if implementation supports. pub config_path: Option, - #[doc(hidden)] - /// Workers in runtime - /// It should be replaced with metrics APIs: https://github.com/tokio-rs/tokio/issues/4073 - pub worker_count: usize, - /// OnlineConfiguration (SIP008) /// https://shadowsocks.org/doc/sip008.html #[cfg(feature = "local-online-config")] @@ -1488,8 +1483,6 @@ impl Config { config_path: None, - worker_count: 1, - #[cfg(feature = "local-online-config")] online_config: None, } diff --git a/crates/shadowsocks-service/src/manager/server.rs b/crates/shadowsocks-service/src/manager/server.rs index f5db629dad1a..ae27acee602c 100644 --- a/crates/shadowsocks-service/src/manager/server.rs +++ b/crates/shadowsocks-service/src/manager/server.rs @@ -85,7 +85,6 @@ pub struct ManagerBuilder { acl: Option>, ipv6_first: bool, security: SecurityConfig, - worker_count: usize, } impl ManagerBuilder { @@ -106,7 +105,6 @@ impl ManagerBuilder { acl: None, ipv6_first: false, security: SecurityConfig::default(), - worker_count: 1, } } @@ -156,14 +154,6 @@ impl ManagerBuilder { self.security = security; } - /// Set runtime worker count - /// - /// Should be replaced with tokio's metric API when it is stablized. - /// https://github.com/tokio-rs/tokio/issues/4073 - pub fn set_worker_count(&mut self, worker_count: usize) { - self.worker_count = worker_count; - } - /// Build the manager server instance pub async fn build(self) -> io::Result { let listener = ManagerListener::bind(&self.context, &self.svr_cfg.addr).await?; @@ -178,7 +168,6 @@ impl ManagerBuilder { acl: self.acl, ipv6_first: self.ipv6_first, security: self.security, - worker_count: self.worker_count, listener, }) } @@ -196,7 +185,6 @@ pub struct Manager { acl: Option>, ipv6_first: bool, security: SecurityConfig, - worker_count: usize, listener: ManagerListener, } @@ -293,8 +281,6 @@ impl Manager { server_builder.set_security_config(&self.security); - server_builder.set_worker_count(self.worker_count); - let server_port = server_builder.server_config().addr().port(); let mut servers = self.servers.lock().await; diff --git a/crates/shadowsocks-service/src/server/mod.rs b/crates/shadowsocks-service/src/server/mod.rs index bb4c6b47b4c0..d58ab450134f 100644 --- a/crates/shadowsocks-service/src/server/mod.rs +++ b/crates/shadowsocks-service/src/server/mod.rs @@ -150,10 +150,6 @@ pub async fn run(config: Config) -> io::Result<()> { server_builder.set_ipv6_first(config.ipv6_first); } - if config.worker_count >= 1 { - server_builder.set_worker_count(config.worker_count); - } - server_builder.set_security_config(&config.security); let server = server_builder.build().await?; diff --git a/crates/shadowsocks-service/src/server/server.rs b/crates/shadowsocks-service/src/server/server.rs index f06811aee3ec..2bd0d033714b 100644 --- a/crates/shadowsocks-service/src/server/server.rs +++ b/crates/shadowsocks-service/src/server/server.rs @@ -30,7 +30,6 @@ pub struct ServerBuilder { udp_capacity: Option, manager_addr: Option, accept_opts: AcceptOpts, - worker_count: usize, } impl ServerBuilder { @@ -48,7 +47,6 @@ impl ServerBuilder { udp_capacity: None, manager_addr: None, accept_opts: AcceptOpts::default(), - worker_count: 1, } } @@ -83,14 +81,6 @@ impl ServerBuilder { self.manager_addr = Some(manager_addr); } - /// Set runtime worker count - /// - /// Should be replaced with tokio's metric API when it is stablized. - /// https://github.com/tokio-rs/tokio/issues/4073 - pub fn set_worker_count(&mut self, worker_count: usize) { - self.worker_count = worker_count; - } - /// Get server's configuration pub fn server_config(&self) -> &ServerConfig { &self.svr_cfg @@ -147,7 +137,7 @@ impl ServerBuilder { let mut udp_server = None; if self.svr_cfg.mode().enable_udp() { - let mut server = UdpServer::new( + let server = UdpServer::new( self.context.clone(), self.svr_cfg.clone(), self.udp_expiry_duration, @@ -155,7 +145,6 @@ impl ServerBuilder { self.accept_opts.clone(), ) .await?; - server.set_worker_count(self.worker_count); udp_server = Some(server); } diff --git a/crates/shadowsocks-service/src/server/udprelay.rs b/crates/shadowsocks-service/src/server/udprelay.rs index 0cc3cb45f675..9efbdfcea7d6 100644 --- a/crates/shadowsocks-service/src/server/udprelay.rs +++ b/crates/shadowsocks-service/src/server/udprelay.rs @@ -27,7 +27,7 @@ use shadowsocks::{ }, ServerConfig, }; -use tokio::{sync::mpsc, task::JoinHandle, time}; +use tokio::{runtime::Handle, sync::mpsc, task::JoinHandle, time}; #[cfg(windows)] use windows_sys::Win32::Networking::WinSock::WSAEAFNOSUPPORT; @@ -93,7 +93,6 @@ pub struct UdpServer { keepalive_tx: mpsc::Sender, keepalive_rx: mpsc::Receiver, time_to_live: Duration, - worker_count: usize, listener: Arc, svr_cfg: ServerConfig, } @@ -140,17 +139,11 @@ impl UdpServer { keepalive_tx, keepalive_rx, time_to_live, - worker_count: 1, listener, svr_cfg, }) } - #[inline] - pub(crate) fn set_worker_count(&mut self, worker_count: usize) { - self.worker_count = worker_count; - } - /// Server's configuration pub fn server_config(&self) -> &ServerConfig { &self.svr_cfg @@ -173,7 +166,7 @@ impl UdpServer { let mut orx_opt = None; - let cpus = self.worker_count; + let cpus = Handle::current().metrics().num_workers(); let mut other_receivers = Vec::new(); if cpus > 1 { let (otx, orx) = mpsc::channel((cpus - 1) * 16); diff --git a/src/service/manager.rs b/src/service/manager.rs index b232da305497..0d44a8fad9c9 100644 --- a/src/service/manager.rs +++ b/src/service/manager.rs @@ -500,23 +500,18 @@ pub fn create(matches: &ArgMatches) -> Result<(Runtime, impl Future Builder::new_current_thread(), #[cfg(feature = "multi-threaded")] RuntimeMode::MultiThread => { let mut builder = Builder::new_multi_thread(); if let Some(worker_threads) = service_config.runtime.worker_count { - worker_count = worker_threads; builder.worker_threads(worker_threads); - } else { - worker_count = num_cpus::get(); } builder } }; - config.worker_count = worker_count; let runtime = builder.enable_all().build().expect("create tokio Runtime"); diff --git a/src/service/server.rs b/src/service/server.rs index 4efaacbc1152..bdb01262c286 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -520,23 +520,18 @@ pub fn create(matches: &ArgMatches) -> Result<(Runtime, impl Future Builder::new_current_thread(), #[cfg(feature = "multi-threaded")] RuntimeMode::MultiThread => { let mut builder = Builder::new_multi_thread(); if let Some(worker_threads) = service_config.runtime.worker_count { - worker_count = worker_threads; builder.worker_threads(worker_threads); - } else { - worker_count = num_cpus::get(); } builder } }; - config.worker_count = worker_count; let runtime = builder.enable_all().build().expect("create tokio Runtime");