Skip to content

Commit

Permalink
updates to match the needs of the integration process
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Dec 11, 2024
1 parent 309350b commit 1e043c1
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 156 deletions.
5 changes: 3 additions & 2 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ publish = false
[dependencies]
anyhow = { workspace = true }
log = { workspace = true }
num_cpus ={ workspace = true }
num_cpus = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true, features = ["derive"] }
solana-metrics ={ workspace = true }
solana-metrics = { workspace = true }
thread-priority = "1.2.0"
tokio = { workspace = true, features = ["time", "rt-multi-thread"] }

Expand All @@ -27,3 +27,4 @@ affinity = "0.1.2"
[dev-dependencies]
axum = "0.7.9"
serde_json = { workspace = true }
toml = { workspace = true }
13 changes: 7 additions & 6 deletions thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
future::IntoFuture,
io::Write,
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
Expand Down Expand Up @@ -48,11 +48,12 @@ fn main() -> anyhow::Result<()> {
println!("Running {exp}");
let mut conffile = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
conffile.push(exp);
let conffile = std::fs::File::open(conffile)?;
let cfg: RuntimeManagerConfig = serde_json::from_reader(conffile)?;
let mut buf = String::new();
std::fs::File::open(conffile)?.read_to_string(&mut buf)?;
let cfg: RuntimeManagerConfig = toml::from_str(&buf)?;
//println!("Loaded config {}", serde_json::to_string_pretty(&cfg)?);

let rtm = RuntimeManager::new(cfg).unwrap();
let rtm = ThreadManager::new(cfg).unwrap();
let tok1 = rtm
.get_tokio("axum1")
.expect("Expecting runtime named axum1");
Expand All @@ -63,10 +64,10 @@ fn main() -> anyhow::Result<()> {
let wrk_cores: Vec<_> = (32..64).collect();
let results = std::thread::scope(|s| {
s.spawn(|| {
tok1.start(axum_main(8888));
tok1.tokio.block_on(axum_main(8888));
});
s.spawn(|| {
tok2.start(axum_main(8889));
tok2.tokio.block_on(axum_main(8889));
});
let jh = s.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap());
jh.join().expect("WRK crashed!")
Expand Down
31 changes: 0 additions & 31 deletions thread-manager/examples/core_contention_contending_set.json

This file was deleted.

13 changes: 13 additions & 0 deletions thread-manager/examples/core_contention_contending_set.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[native_configs]

[rayon_configs]

[tokio_configs.axum1]
worker_threads = 8
max_blocking_threads = 1
core_allocation.DedicatedCoreSet = { min = 0, max = 8 }

[tokio_configs.axum2]
worker_threads = 8
max_blocking_threads = 1
core_allocation.DedicatedCoreSet = { min = 0, max = 8 }
31 changes: 0 additions & 31 deletions thread-manager/examples/core_contention_dedicated_set.json

This file was deleted.

13 changes: 13 additions & 0 deletions thread-manager/examples/core_contention_dedicated_set.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[native_configs]

[rayon_configs]

[tokio_configs.axum1]
worker_threads = 4
max_blocking_threads = 1
core_allocation.DedicatedCoreSet = { min = 0, max = 4 }

[tokio_configs.axum2]
worker_threads = 4
max_blocking_threads = 1
core_allocation.DedicatedCoreSet = { min = 4, max = 8 }
20 changes: 0 additions & 20 deletions thread-manager/examples/core_contention_single_runtime.json

This file was deleted.

10 changes: 5 additions & 5 deletions thread-manager/examples/core_contention_sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn main() -> anyhow::Result<()> {
println!("Running {core_cnt} cores under {regime:?}");
let (tok1, tok2) = match regime {
Regime::Shared => {
rtm = RuntimeManager::new(make_config_shared(core_cnt)).unwrap();
rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap();
(
rtm.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
Expand All @@ -121,7 +121,7 @@ fn main() -> anyhow::Result<()> {
)
}
Regime::Dedicated => {
rtm = RuntimeManager::new(make_config_dedicated(core_cnt)).unwrap();
rtm = ThreadManager::new(make_config_dedicated(core_cnt)).unwrap();
(
rtm.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
Expand All @@ -130,7 +130,7 @@ fn main() -> anyhow::Result<()> {
)
}
Regime::Single => {
rtm = RuntimeManager::new(make_config_shared(core_cnt)).unwrap();
rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap();
(
rtm.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
Expand All @@ -143,15 +143,15 @@ fn main() -> anyhow::Result<()> {
let wrk_cores: Vec<_> = (32..64).collect();
let results = std::thread::scope(|s| {
s.spawn(|| {
tok1.start(axum_main(8888));
tok1.tokio.spawn(axum_main(8888));
});
let jh = match regime {
Regime::Single => s.spawn(|| {
run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 1000).unwrap()
}),
_ => {
s.spawn(|| {
tok2.start(axum_main(8889));
tok2.tokio.spawn(axum_main(8889));
});
s.spawn(|| {
run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()
Expand Down
80 changes: 57 additions & 23 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ pub mod rayon_runtime;
pub mod tokio_runtime;

pub use {
native_thread_runtime::{NativeConfig, NativeThreadRuntime},
native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
policy::CoreAllocation,
rayon_runtime::{RayonConfig, RayonRuntime},
tokio_runtime::{TokioConfig, TokioRuntime},
};
pub type ConstString = Box<str>;

#[derive(Default, Debug)]
pub struct RuntimeManager {
pub struct ThreadManager {
pub tokio_runtimes: HashMap<ConstString, TokioRuntime>,
pub tokio_runtime_mapping: HashMap<ConstString, ConstString>,

Expand All @@ -44,7 +44,7 @@ pub struct RuntimeManagerConfig {
pub default_core_allocation: CoreAllocation,
}

impl RuntimeManager {
impl ThreadManager {
pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
let n = self.native_runtime_mapping.get(name)?;
self.native_thread_runtimes.get(n)
Expand All @@ -64,36 +64,50 @@ impl RuntimeManager {
Ok(chosen_cores_mask)
}

pub fn new(config: RuntimeManagerConfig) -> anyhow::Result<Self> {
let mut core_allocations = HashMap::<ConstString, Vec<usize>>::new();
Self::set_process_affinity(&config)?;
let mut manager = Self::default();
/// Populates mappings with copies of config names, overrides as appropriate
fn populate_mappings(&mut self, config: &RuntimeManagerConfig) {
//TODO: this should probably be cleaned up with a macro at some point...

//TODO: this should probably be cleaned up at some point...
for (k, v) in config.tokio_runtime_mapping.iter() {
manager
.tokio_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
for name in config.native_configs.keys() {
self.native_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
}
for (k, v) in config.native_runtime_mapping.iter() {
manager
.native_runtime_mapping
self.native_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
}
for (k, v) in config.rayon_runtime_mapping.iter() {
manager
.rayon_runtime_mapping

for name in config.tokio_configs.keys() {
self.tokio_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
}
for (k, v) in config.tokio_runtime_mapping.iter() {
self.tokio_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
}

for name in config.rayon_configs.keys() {
self.rayon_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
}
for (k, v) in config.rayon_runtime_mapping.iter() {
self.rayon_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
}
}
pub fn new(config: RuntimeManagerConfig) -> anyhow::Result<Self> {
let mut core_allocations = HashMap::<ConstString, Vec<usize>>::new();
Self::set_process_affinity(&config)?;
let mut manager = Self::default();
manager.populate_mappings(&config);
for (name, cfg) in config.native_configs.iter() {
let nrt = NativeThreadRuntime::new(cfg.clone());
let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
manager
.native_thread_runtimes
.insert(name.clone().into_boxed_str(), nrt);
}
for (name, cfg) in config.rayon_configs.iter() {
let rrt = RayonRuntime::new(cfg.clone())?;
let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
manager
.rayon_runtimes
.insert(name.clone().into_boxed_str(), rrt);
Expand All @@ -117,10 +131,30 @@ impl RuntimeManager {
#[cfg(test)]
mod tests {
use {
crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManager, RuntimeManagerConfig},
std::collections::HashMap,
crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManagerConfig, ThreadManager},
std::{collections::HashMap, io::Read},
};

#[test]
fn configtest() {
let experiments = [
"examples/core_contention_dedicated_set.toml",
"examples/core_contention_contending_set.toml",
];

for exp in experiments {
println!("Loading config {exp}");
let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
conffile.push(exp);
let mut buf = String::new();
std::fs::File::open(conffile)
.unwrap()
.read_to_string(&mut buf)
.unwrap();
let cfg: RuntimeManagerConfig = toml::from_str(&buf).unwrap();
println!("{:?}", cfg);
}
}
// Nobody runs Agave on windows, and on Mac we can not set mask affinity without patching external crate
#[cfg(target_os = "linux")]
fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
Expand All @@ -147,7 +181,7 @@ mod tests {
..Default::default()
};

let rtm = RuntimeManager::new(conf).unwrap();
let rtm = ThreadManager::new(conf).unwrap();
let r = rtm.get_native("test").unwrap();

let t2 = r
Expand Down Expand Up @@ -199,7 +233,7 @@ mod tests {
..Default::default()
};

let rtm = RuntimeManager::new(conf).unwrap();
let rtm = ThreadManager::new(conf).unwrap();
let r = rtm.get_native("test").unwrap();

let t2 = r
Expand Down
Loading

0 comments on commit 1e043c1

Please sign in to comment.