From 1e043c164458471b68fba24f891ad74a56326cdd Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Wed, 11 Dec 2024 22:20:10 +0000 Subject: [PATCH] updates to match the needs of the integration process --- thread-manager/Cargo.toml | 5 +- .../examples/core_contention_basics.rs | 13 +-- .../core_contention_contending_set.json | 31 ------- .../core_contention_contending_set.toml | 13 +++ .../core_contention_dedicated_set.json | 31 ------- .../core_contention_dedicated_set.toml | 13 +++ .../core_contention_single_runtime.json | 20 ----- .../examples/core_contention_sweep.rs | 10 +-- thread-manager/src/lib.rs | 80 +++++++++++++------ thread-manager/src/native_thread_runtime.rs | 12 +-- thread-manager/src/rayon_runtime.rs | 3 +- thread-manager/src/tokio_runtime.rs | 36 ++------- 12 files changed, 111 insertions(+), 156 deletions(-) delete mode 100644 thread-manager/examples/core_contention_contending_set.json create mode 100644 thread-manager/examples/core_contention_contending_set.toml delete mode 100644 thread-manager/examples/core_contention_dedicated_set.json create mode 100644 thread-manager/examples/core_contention_dedicated_set.toml delete mode 100644 thread-manager/examples/core_contention_single_runtime.json diff --git a/thread-manager/Cargo.toml b/thread-manager/Cargo.toml index 66cf3c8d5e1600..265eb3aafd97fb 100644 --- a/thread-manager/Cargo.toml +++ b/thread-manager/Cargo.toml @@ -14,10 +14,10 @@ publish = false [dependencies] anyhow = { workspace = true } log = { workspace = true } -num_cpus ={ workspace = true } +num_cpus = { workspace = true } rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } -solana-metrics ={ workspace = true } +solana-metrics = { workspace = true } thread-priority = "1.2.0" tokio = { workspace = true, features = ["time", "rt-multi-thread"] } @@ -27,3 +27,4 @@ affinity = "0.1.2" [dev-dependencies] axum = "0.7.9" serde_json = { workspace = true } +toml = { workspace = true } diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs index 36a750fc872f69..219712df060a68 100644 --- a/thread-manager/examples/core_contention_basics.rs +++ b/thread-manager/examples/core_contention_basics.rs @@ -1,6 +1,6 @@ use std::{ future::IntoFuture, - io::Write, + io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, time::Duration, @@ -48,11 +48,12 @@ fn main() -> anyhow::Result<()> { println!("Running {exp}"); let mut conffile = PathBuf::from(env!("CARGO_MANIFEST_DIR")); conffile.push(exp); - let conffile = std::fs::File::open(conffile)?; - let cfg: RuntimeManagerConfig = serde_json::from_reader(conffile)?; + let mut buf = String::new(); + std::fs::File::open(conffile)?.read_to_string(&mut buf)?; + let cfg: RuntimeManagerConfig = toml::from_str(&buf)?; //println!("Loaded config {}", serde_json::to_string_pretty(&cfg)?); - let rtm = RuntimeManager::new(cfg).unwrap(); + let rtm = ThreadManager::new(cfg).unwrap(); let tok1 = rtm .get_tokio("axum1") .expect("Expecting runtime named axum1"); @@ -63,10 +64,10 @@ fn main() -> anyhow::Result<()> { let wrk_cores: Vec<_> = (32..64).collect(); let results = std::thread::scope(|s| { s.spawn(|| { - tok1.start(axum_main(8888)); + tok1.tokio.block_on(axum_main(8888)); }); s.spawn(|| { - tok2.start(axum_main(8889)); + tok2.tokio.block_on(axum_main(8889)); }); let jh = s.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()); jh.join().expect("WRK crashed!") diff --git a/thread-manager/examples/core_contention_contending_set.json b/thread-manager/examples/core_contention_contending_set.json deleted file mode 100644 index 1225cc8e494b0f..00000000000000 --- a/thread-manager/examples/core_contention_contending_set.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "tokio_configs": { - "tokio1": { - "worker_threads": 8, - "max_blocking_threads": 1, - "priority": 0, - "core_allocation": { - "DedicatedCoreSet": { - "min": 0, - "max": 8 - } - } - }, - "tokio2": { - "worker_threads": 8, - "max_blocking_threads": 1, - "priority": 0, - "core_allocation": { - "DedicatedCoreSet": { - "min": 0, - "max": 8 - } - } - } - }, - "tokio_runtime_mapping": { - "axum2": "tokio2", - "axum1": "tokio1" - }, - "native_configs": {} -} diff --git a/thread-manager/examples/core_contention_contending_set.toml b/thread-manager/examples/core_contention_contending_set.toml new file mode 100644 index 00000000000000..e383987a5a432c --- /dev/null +++ b/thread-manager/examples/core_contention_contending_set.toml @@ -0,0 +1,13 @@ +[native_configs] + +[rayon_configs] + +[tokio_configs.axum1] +worker_threads = 8 +max_blocking_threads = 1 +core_allocation.DedicatedCoreSet = { min = 0, max = 8 } + +[tokio_configs.axum2] +worker_threads = 8 +max_blocking_threads = 1 +core_allocation.DedicatedCoreSet = { min = 0, max = 8 } diff --git a/thread-manager/examples/core_contention_dedicated_set.json b/thread-manager/examples/core_contention_dedicated_set.json deleted file mode 100644 index 4e9c76170cf7cf..00000000000000 --- a/thread-manager/examples/core_contention_dedicated_set.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "tokio_configs": { - "tokio1": { - "worker_threads": 4, - "max_blocking_threads": 1, - "priority": 0, - "core_allocation": { - "DedicatedCoreSet": { - "min": 0, - "max": 4 - } - } - }, - "tokio2": { - "worker_threads": 4, - "max_blocking_threads": 1, - "priority": 0, - "core_allocation": { - "DedicatedCoreSet": { - "min": 4, - "max": 8 - } - } - } - }, - "tokio_runtime_mapping": { - "axum2": "tokio2", - "axum1": "tokio1" - }, - "native_configs": {} -} diff --git a/thread-manager/examples/core_contention_dedicated_set.toml b/thread-manager/examples/core_contention_dedicated_set.toml new file mode 100644 index 00000000000000..a82af7d9f5fd47 --- /dev/null +++ b/thread-manager/examples/core_contention_dedicated_set.toml @@ -0,0 +1,13 @@ +[native_configs] + +[rayon_configs] + +[tokio_configs.axum1] +worker_threads = 4 +max_blocking_threads = 1 +core_allocation.DedicatedCoreSet = { min = 0, max = 4 } + +[tokio_configs.axum2] +worker_threads = 4 +max_blocking_threads = 1 +core_allocation.DedicatedCoreSet = { min = 4, max = 8 } diff --git a/thread-manager/examples/core_contention_single_runtime.json b/thread-manager/examples/core_contention_single_runtime.json deleted file mode 100644 index 42d743a188cc35..00000000000000 --- a/thread-manager/examples/core_contention_single_runtime.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "tokio_configs": { - "tokio1": { - "worker_threads": 8, - "max_blocking_threads": 1, - "priority": 0, - "core_allocation": { - "DedicatedCoreSet": { - "min": 0, - "max": 8 - } - } - } - }, - "tokio_runtime_mapping": { - "axum2": "tokio1", - "axum1": "tokio1" - }, - "native_configs": {} -} diff --git a/thread-manager/examples/core_contention_sweep.rs b/thread-manager/examples/core_contention_sweep.rs index f160ddf3886d4e..51ba4c08e714bd 100644 --- a/thread-manager/examples/core_contention_sweep.rs +++ b/thread-manager/examples/core_contention_sweep.rs @@ -112,7 +112,7 @@ fn main() -> anyhow::Result<()> { println!("Running {core_cnt} cores under {regime:?}"); let (tok1, tok2) = match regime { Regime::Shared => { - rtm = RuntimeManager::new(make_config_shared(core_cnt)).unwrap(); + rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap(); ( rtm.get_tokio("axum1") .expect("Expecting runtime named axum1"), @@ -121,7 +121,7 @@ fn main() -> anyhow::Result<()> { ) } Regime::Dedicated => { - rtm = RuntimeManager::new(make_config_dedicated(core_cnt)).unwrap(); + rtm = ThreadManager::new(make_config_dedicated(core_cnt)).unwrap(); ( rtm.get_tokio("axum1") .expect("Expecting runtime named axum1"), @@ -130,7 +130,7 @@ fn main() -> anyhow::Result<()> { ) } Regime::Single => { - rtm = RuntimeManager::new(make_config_shared(core_cnt)).unwrap(); + rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap(); ( rtm.get_tokio("axum1") .expect("Expecting runtime named axum1"), @@ -143,7 +143,7 @@ fn main() -> anyhow::Result<()> { let wrk_cores: Vec<_> = (32..64).collect(); let results = std::thread::scope(|s| { s.spawn(|| { - tok1.start(axum_main(8888)); + tok1.tokio.spawn(axum_main(8888)); }); let jh = match regime { Regime::Single => s.spawn(|| { @@ -151,7 +151,7 @@ fn main() -> anyhow::Result<()> { }), _ => { s.spawn(|| { - tok2.start(axum_main(8889)); + tok2.tokio.spawn(axum_main(8889)); }); s.spawn(|| { run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap() diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs index b6a88374a42825..c348ef525baa05 100644 --- a/thread-manager/src/lib.rs +++ b/thread-manager/src/lib.rs @@ -10,7 +10,7 @@ pub mod rayon_runtime; pub mod tokio_runtime; pub use { - native_thread_runtime::{NativeConfig, NativeThreadRuntime}, + native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime}, policy::CoreAllocation, rayon_runtime::{RayonConfig, RayonRuntime}, tokio_runtime::{TokioConfig, TokioRuntime}, @@ -18,7 +18,7 @@ pub use { pub type ConstString = Box; #[derive(Default, Debug)] -pub struct RuntimeManager { +pub struct ThreadManager { pub tokio_runtimes: HashMap, pub tokio_runtime_mapping: HashMap, @@ -44,7 +44,7 @@ pub struct RuntimeManagerConfig { pub default_core_allocation: CoreAllocation, } -impl RuntimeManager { +impl ThreadManager { pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> { let n = self.native_runtime_mapping.get(name)?; self.native_thread_runtimes.get(n) @@ -64,36 +64,50 @@ impl RuntimeManager { Ok(chosen_cores_mask) } - pub fn new(config: RuntimeManagerConfig) -> anyhow::Result { - let mut core_allocations = HashMap::>::new(); - Self::set_process_affinity(&config)?; - let mut manager = Self::default(); + /// Populates mappings with copies of config names, overrides as appropriate + fn populate_mappings(&mut self, config: &RuntimeManagerConfig) { + //TODO: this should probably be cleaned up with a macro at some point... - //TODO: this should probably be cleaned up at some point... - for (k, v) in config.tokio_runtime_mapping.iter() { - manager - .tokio_runtime_mapping - .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + for name in config.native_configs.keys() { + self.native_runtime_mapping + .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); } for (k, v) in config.native_runtime_mapping.iter() { - manager - .native_runtime_mapping + self.native_runtime_mapping .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); } - for (k, v) in config.rayon_runtime_mapping.iter() { - manager - .rayon_runtime_mapping + + for name in config.tokio_configs.keys() { + self.tokio_runtime_mapping + .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); + } + for (k, v) in config.tokio_runtime_mapping.iter() { + self.tokio_runtime_mapping .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); } + for name in config.rayon_configs.keys() { + self.rayon_runtime_mapping + .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); + } + for (k, v) in config.rayon_runtime_mapping.iter() { + self.rayon_runtime_mapping + .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + } + } + pub fn new(config: RuntimeManagerConfig) -> anyhow::Result { + let mut core_allocations = HashMap::>::new(); + Self::set_process_affinity(&config)?; + let mut manager = Self::default(); + manager.populate_mappings(&config); for (name, cfg) in config.native_configs.iter() { - let nrt = NativeThreadRuntime::new(cfg.clone()); + let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone()); manager .native_thread_runtimes .insert(name.clone().into_boxed_str(), nrt); } for (name, cfg) in config.rayon_configs.iter() { - let rrt = RayonRuntime::new(cfg.clone())?; + let rrt = RayonRuntime::new(name.clone(), cfg.clone())?; manager .rayon_runtimes .insert(name.clone().into_boxed_str(), rrt); @@ -117,10 +131,30 @@ impl RuntimeManager { #[cfg(test)] mod tests { use { - crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManager, RuntimeManagerConfig}, - std::collections::HashMap, + crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManagerConfig, ThreadManager}, + std::{collections::HashMap, io::Read}, }; + #[test] + fn configtest() { + let experiments = [ + "examples/core_contention_dedicated_set.toml", + "examples/core_contention_contending_set.toml", + ]; + + for exp in experiments { + println!("Loading config {exp}"); + let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + conffile.push(exp); + let mut buf = String::new(); + std::fs::File::open(conffile) + .unwrap() + .read_to_string(&mut buf) + .unwrap(); + let cfg: RuntimeManagerConfig = toml::from_str(&buf).unwrap(); + println!("{:?}", cfg); + } + } // Nobody runs Agave on windows, and on Mac we can not set mask affinity without patching external crate #[cfg(target_os = "linux")] fn validate_affinity(expect_cores: &[usize], error_msg: &str) { @@ -147,7 +181,7 @@ mod tests { ..Default::default() }; - let rtm = RuntimeManager::new(conf).unwrap(); + let rtm = ThreadManager::new(conf).unwrap(); let r = rtm.get_native("test").unwrap(); let t2 = r @@ -199,7 +233,7 @@ mod tests { ..Default::default() }; - let rtm = RuntimeManager::new(conf).unwrap(); + let rtm = ThreadManager::new(conf).unwrap(); let r = rtm.get_native("test").unwrap(); let t2 = r diff --git a/thread-manager/src/native_thread_runtime.rs b/thread-manager/src/native_thread_runtime.rs index 9653e68290a3d0..6e2925508d1e99 100644 --- a/thread-manager/src/native_thread_runtime.rs +++ b/thread-manager/src/native_thread_runtime.rs @@ -15,7 +15,6 @@ pub struct NativeConfig { pub core_allocation: CoreAllocation, pub max_threads: usize, pub priority: u8, - pub name_base: String, pub stack_size_bytes: usize, } @@ -26,7 +25,6 @@ impl Default for NativeConfig { max_threads: 10, priority: 0, stack_size_bytes: 2 * 1024 * 1024, - name_base: "thread".to_owned(), } } } @@ -36,6 +34,7 @@ pub struct NativeThreadRuntime { pub id_count: AtomicUsize, pub running_count: Arc, pub config: NativeConfig, + pub name: String, } pub struct JoinHandle { @@ -44,7 +43,7 @@ pub struct JoinHandle { } impl JoinHandle { - fn join_inner(&mut self) -> Result> { + fn join_inner(&mut self) -> std::thread::Result { match self.std_handle.take() { Some(jh) => { let result = jh.join(); @@ -58,7 +57,7 @@ impl JoinHandle { } } - pub fn join(mut self) -> Result> { + pub fn join(mut self) -> std::thread::Result { self.join_inner() } @@ -80,11 +79,12 @@ impl Drop for JoinHandle { } impl NativeThreadRuntime { - pub fn new(cfg: NativeConfig) -> Self { + pub fn new(name: String, cfg: NativeConfig) -> Self { Self { id_count: AtomicUsize::new(0), running_count: Arc::new(AtomicUsize::new(0)), config: cfg, + name, } } pub fn spawn(&self, f: F) -> anyhow::Result> @@ -103,7 +103,7 @@ impl NativeThreadRuntime { let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector()); let n = self.id_count.fetch_add(1, Ordering::Relaxed); let jh = std::thread::Builder::new() - .name(format!("{}-{}", &self.config.name_base, n)) + .name(format!("{}-{}", &self.name, n)) .stack_size(self.config.stack_size_bytes) .spawn(move || { apply_policy(&core_alloc, priority, &chosen_cores_mask); diff --git a/thread-manager/src/rayon_runtime.rs b/thread-manager/src/rayon_runtime.rs index f1a106a4453657..b731bd83051bcb 100644 --- a/thread-manager/src/rayon_runtime.rs +++ b/thread-manager/src/rayon_runtime.rs @@ -36,13 +36,14 @@ pub struct RayonRuntime { } impl RayonRuntime { - pub fn new(config: RayonConfig) -> anyhow::Result { + pub fn new(name: String, config: RayonConfig) -> anyhow::Result { let policy = config.core_allocation.clone(); let chosen_cores_mask = Mutex::new(policy.as_core_mask_vector()); let priority = config.priority; let spawned_threads = AtomicI64::new(0); let rayon_pool = rayon::ThreadPoolBuilder::new() .num_threads(config.worker_threads) + .thread_name(move |i| format!("{}_{}", &name, i)) .start_handler(move |_idx| { let rc = spawned_threads.fetch_add(1, Ordering::Relaxed); datapoint_info!("thread-manager-rayon", ("threads-spawned", rc, i64),); diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs index 63da2b2f0ebc4b..3e0682b8c46bd3 100644 --- a/thread-manager/src/tokio_runtime.rs +++ b/thread-manager/src/tokio_runtime.rs @@ -2,12 +2,9 @@ use { crate::policy::{apply_policy, CoreAllocation}, serde::{Deserialize, Serialize}, solana_metrics::datapoint_info, - std::{ - future::Future, - sync::{ - atomic::{AtomicI64, AtomicUsize, Ordering}, - Arc, Mutex, - }, + std::sync::{ + atomic::{AtomicI64, AtomicUsize, Ordering}, + Arc, Mutex, }, thread_priority::ThreadExt, }; @@ -69,13 +66,13 @@ impl ThreadCounters { #[derive(Debug)] pub struct TokioRuntime { - pub(crate) tokio: tokio::runtime::Runtime, + pub tokio: tokio::runtime::Runtime, pub config: TokioConfig, pub counters: Arc, } impl TokioRuntime { - pub(crate) fn new(name: String, cfg: TokioConfig) -> anyhow::Result { + pub fn new(name: String, cfg: TokioConfig) -> anyhow::Result { let num_workers = if cfg.worker_threads == 0 { num_cpus::get() } else { @@ -84,10 +81,6 @@ impl TokioRuntime { let chosen_cores_mask = cfg.core_allocation.as_core_mask_vector(); let base_name = name.clone(); - println!( - "Assigning {:?} to runtime {}", - &chosen_cores_mask, &base_name - ); let mut builder = match num_workers { 1 => tokio::runtime::Builder::new_current_thread(), _ => { @@ -140,23 +133,4 @@ impl TokioRuntime { counters, }) } - /* This is bad idea... - pub fn spawn(&self, fut: F)->::Output - where F: Future - { - self.tokio.spawn(fut) - } - pub fn spawn_blocking(&self, fut: F)->::Output - where F: Future - { - self.spawn(fut) - } - */ - pub fn start(&self, fut: F) -> F::Output - where - F: Future, - { - // the thread that calls block_on does not need its affinity messed with here - self.tokio.block_on(fut) - } }