diff --git a/.bleep b/.bleep index becb0a18..a7047957 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -deb3c5409e938ec9c7d0da9b7a2d331eabbb2cd5 \ No newline at end of file +7d3baa7e49e9b5c7d76775971c9f57f604209f38 \ No newline at end of file diff --git a/pingora-core/Cargo.toml b/pingora-core/Cargo.toml index 76d239cb..a885e6bf 100644 --- a/pingora-core/Cargo.toml +++ b/pingora-core/Cargo.toml @@ -35,13 +35,13 @@ http = { workspace = true } log = { workspace = true } h2 = { workspace = true } lru = { workspace = true } -nix = "0.24" +nix = "~0.24.3" structopt = "0.3" once_cell = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.8" libc = "0.2.70" -chrono = { version = "0.4", features = ["alloc"], default-features = false } +chrono = { version = "~0.4.31", features = ["alloc"], default-features = false } thread_local = "1.0" prometheus = "0.13" daemonize = "0.5.0" diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 7881e597..46045447 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -293,7 +293,7 @@ impl HttpSession { InvalidHTTPHeader, format!("buf: {:?}", String::from_utf8_lossy(&buf)), e, - ) + ); } } } @@ -428,10 +428,34 @@ impl HttpSession { is_buf_keepalive(self.get_header(header::CONNECTION).map(|v| v.as_bytes())) } - // `Keep-Alive: timeout=5, max=1000` => 5, 1000 + /// `Keep-Alive: timeout=5, max=1000` => 5, 1000 + /// This is defined in the below spec, this not part of any RFC, so + /// it's behavior is different on different platforms. + /// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive fn get_keepalive_values(&self) -> (Option, Option) { - // TODO: implement this parsing - (None, None) + let Some(keep_alive_header) = self.get_header("Keep-Alive") else { + return (None, None); + }; + + let Ok(header_value) = str::from_utf8(keep_alive_header.as_bytes()) else { + return (None, None); + }; + + let mut timeout = None; + let mut max = None; + + for param in header_value.split(',') { + let mut parts = param.splitn(2, '=').map(|s| s.trim()); + match (parts.next(), parts.next()) { + (Some("timeout"), Some(timeout_value)) => { + timeout = timeout_value.trim().parse().ok() + } + (Some("max"), Some(max_value)) => max = max_value.trim().parse().ok(), + _ => {} + } + } + + (timeout, max) } /// Close the connection abruptly. This allows to signal the server that the connection is closed @@ -532,7 +556,7 @@ impl HttpSession { } fn init_req_body_writer(&mut self, header: &RequestHeader) { - if self.is_upgrade_req() { + if is_upgrade_req(header) { self.body_writer.init_http10(); } else { self.init_body_writer_comm(&header.headers) @@ -892,6 +916,26 @@ mod tests_stream { } } + #[tokio::test] + async fn init_body_for_upgraded_req() { + use crate::protocols::http::v1::body::BodyMode; + + let wire = + b"GET / HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: WS\r\nContent-Length: 0\r\n\r\n"; + let mock_io = Builder::new().write(wire).build(); + let mut http_stream = HttpSession::new(Box::new(mock_io)); + let mut new_request = RequestHeader::build("GET", b"/", None).unwrap(); + new_request.insert_header("Connection", "Upgrade").unwrap(); + new_request.insert_header("Upgrade", "WS").unwrap(); + // CL is ignored when Upgrade presents + new_request.insert_header("Content-Length", "0").unwrap(); + let _ = http_stream + .write_request_header(Box::new(new_request)) + .await + .unwrap(); + assert_eq!(http_stream.body_writer.body_mode, BodyMode::HTTP1_0(0)); + } + #[tokio::test] async fn read_switching_protocol() { init_log(); @@ -1061,6 +1105,65 @@ mod tests_stream { .keepalive_timeout, KeepaliveStatus::Off ); + + async fn build_resp_with_keepalive_values(keep_alive: &str) -> HttpSession { + let input = format!("HTTP/1.1 200 OK\r\nKeep-Alive: {keep_alive}\r\n\r\n"); + let mock_io = Builder::new().read(input.as_bytes()).build(); + let mut http_stream = HttpSession::new(Box::new(mock_io)); + let res = http_stream.read_response().await; + assert_eq!(input.len(), res.unwrap()); + http_stream.respect_keepalive(); + http_stream + } + + assert_eq!( + build_resp_with_keepalive_values("timeout=5, max=1000") + .await + .get_keepalive_values(), + (Some(5), Some(1000)) + ); + + assert_eq!( + build_resp_with_keepalive_values("max=1000, timeout=5") + .await + .get_keepalive_values(), + (Some(5), Some(1000)) + ); + + assert_eq!( + build_resp_with_keepalive_values(" timeout = 5, max = 1000 ") + .await + .get_keepalive_values(), + (Some(5), Some(1000)) + ); + + assert_eq!( + build_resp_with_keepalive_values("timeout=5") + .await + .get_keepalive_values(), + (Some(5), None) + ); + + assert_eq!( + build_resp_with_keepalive_values("max=1000") + .await + .get_keepalive_values(), + (None, Some(1000)) + ); + + assert_eq!( + build_resp_with_keepalive_values("a=b") + .await + .get_keepalive_values(), + (None, None) + ); + + assert_eq!( + build_resp_with_keepalive_values("") + .await + .get_keepalive_values(), + (None, None) + ); } /* Note: body tests are covered in server.rs */ diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index fa2cea21..9c29aafe 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -175,7 +175,8 @@ impl Server { /// /// Command line options can either be passed by parsing the command line arguments via /// `Opt::from_args()`, or be generated by other means. - pub fn new(opt: Option) -> Result { + pub fn new(opt: impl Into>) -> Result { + let opt = opt.into(); let (tx, rx) = watch::channel(false); let conf = if let Some(opt) = opt.as_ref() { diff --git a/pingora-http/src/lib.rs b/pingora-http/src/lib.rs index 24b648f7..681bcfbc 100644 --- a/pingora-http/src/lib.rs +++ b/pingora-http/src/lib.rs @@ -169,7 +169,7 @@ impl RequestHeader { /// Insert the header name and value to `self`. /// /// Different from [Self::append_header()], this method will replace all other existing headers - /// under the same name (case insensitive). + /// under the same name (case-insensitive). pub fn insert_header( &mut self, name: impl IntoCaseHeaderName, @@ -476,7 +476,7 @@ fn clone_resp_parts(me: &RespParts) -> RespParts { // This function returns an upper bound on the size of the header map used inside the http crate. // As of version 0.2, there is a limit of 1 << 15 (32,768) items inside the map. There is an -// assertion against this size inside the crate so we want to avoid panicking by not exceeding this +// assertion against this size inside the crate, so we want to avoid panicking by not exceeding this // upper bound. fn http_header_map_upper_bound(size_hint: Option) -> usize { // Even though the crate has 1 << 15 as the max size, calls to `with_capacity` invoke a @@ -484,13 +484,13 @@ fn http_header_map_upper_bound(size_hint: Option) -> usize { // // See https://github.com/hyperium/http/blob/34a9d6bdab027948d6dea3b36d994f9cbaf96f75/src/header/map.rs#L3220 // - // Therefore we set our max size to be even lower so we guarantee ourselves we won't hit that + // Therefore we set our max size to be even lower, so we guarantee ourselves we won't hit that // upper bound in the crate. Any way you cut it, 4,096 headers is insane. const PINGORA_MAX_HEADER_COUNT: usize = 4096; const INIT_HEADER_SIZE: usize = 8; - // We select the size hint or the max size here such that we pick a value substantially lower - // 1 << 15 with room to grow the header map. + // We select the size hint or the max size here, ensuring that we pick a value substantially lower + // than 1 << 15 with room to grow the header map. std::cmp::min( size_hint.unwrap_or(INIT_HEADER_SIZE), PINGORA_MAX_HEADER_COUNT, @@ -509,7 +509,7 @@ fn append_header_value( .as_slice() .try_into() .or_err(InvalidHTTPHeader, "invalid header name")?; - // storage the original case in the map + // store the original case in the map if let Some(name_map) = name_map { name_map.append(header_name.clone(), case_header_name); } @@ -530,7 +530,7 @@ fn insert_header_value( .try_into() .or_err(InvalidHTTPHeader, "invalid header name")?; if let Some(name_map) = name_map { - // storage the original case in the map + // store the original case in the map name_map.insert(header_name.clone(), case_header_name); } value_map.insert(header_name, value); @@ -562,7 +562,7 @@ fn header_to_h1_wire(key_map: Option<&CaseMap>, value_map: &HMap, buf: &mut impl let iter = key_map.iter().zip(value_map.iter()); for ((header, case_header), (header2, val)) in iter { if header != header2 { - // in case the header iter order changes in further version of HMap + // in case the header iteration order changes in future versions of HMap panic!("header iter mismatch {}, {}", header, header2) } buf.put_slice(case_header.as_slice()); diff --git a/pingora-memory-cache/src/lib.rs b/pingora-memory-cache/src/lib.rs index f5c037c3..2b02d28f 100644 --- a/pingora-memory-cache/src/lib.rs +++ b/pingora-memory-cache/src/lib.rs @@ -81,7 +81,7 @@ pub struct MemoryCache { pub(crate) hasher: RandomState, } -impl MemoryCache { +impl MemoryCache { /// Create a new [MemoryCache] with the given size. pub fn new(size: usize) -> Self { MemoryCache { diff --git a/pingora-memory-cache/src/read_through.rs b/pingora-memory-cache/src/read_through.rs index 05a8d892..a10a7c01 100644 --- a/pingora-memory-cache/src/read_through.rs +++ b/pingora-memory-cache/src/read_through.rs @@ -123,7 +123,7 @@ where impl RTCache where K: Hash + Send, - T: Clone + Send + Sync, + T: Clone + Send + Sync + 'static, { /// Create a new [RTCache] of given size. `lock_age` defines how long a lock is valid for. /// `lock_timeout` is used to stop a lookup from holding on to the key for too long. @@ -142,7 +142,7 @@ where impl RTCache where K: Hash + Send, - T: Clone + Send + Sync, + T: Clone + Send + Sync + 'static, CB: Lookup, { /// Query the cache for a given value. If it exists and no TTL is configured initially, it will @@ -288,7 +288,7 @@ where impl RTCache where K: Hash + Send, - T: Clone + Send + Sync, + T: Clone + Send + Sync + 'static, CB: MultiLookup, { /// Same behavior as [RTCache::get] but for an arbitrary amount of keys. diff --git a/tinyufo/Cargo.toml b/tinyufo/Cargo.toml index a726715e..4b3b2c91 100644 --- a/tinyufo/Cargo.toml +++ b/tinyufo/Cargo.toml @@ -20,6 +20,7 @@ ahash = { workspace = true } flurry = "<0.5.0" # Try not to require Rust 1.71 parking_lot = "0" crossbeam-queue = "0" +crossbeam-skiplist = "0" [dev-dependencies] rand = "0" @@ -27,6 +28,7 @@ lru = "0" zipf = "7" moka = { version = "0", features = ["sync"] } dhat = "0" +quick_cache = "0.4" [[bench]] name = "bench_perf" diff --git a/tinyufo/README.md b/tinyufo/README.md index 50e2dd3f..bf24ec10 100644 --- a/tinyufo/README.md +++ b/tinyufo/README.md @@ -38,12 +38,12 @@ Because of TinyUFO's lock-free design, it greatly outperforms the others. ### Memory overhead -The table below show the memory allocation (in bytes) of the compared cache library under certain workloads to store zero-sized assets. +TinyUFO provides a compact mode to trade raw read speed for more memory efficiency. Whether the saving worthy the trade off depends on the actual size and the work load. For small in-memory assets, the saved memory means more things can be cached. -| cache size | TinyUFO | LRU | moka | -| -------- | ------- | ------- | ------ | -| 100 | 39,409 | 9,408 | 354,376 -| 1000 | 236,053 | 128,512 | 535,888 -| 10000 | 2,290,635 | 1,075,648 | 2,489,088 +The table below show the memory allocation (in bytes) of the compared cache library under certain workloads to store zero-sized assets. -Whether these overheads matter depends on the actual sizes and volume of the assets. The more advanced algorithms are likely to be less memory efficient than the simple LRU. \ No newline at end of file +| cache size | TinyUFO | TinyUFO compact | LRU | moka | +| -------- | ------- | ------- | ------- | ------ | +| 100 | 39,409 | 19,000 | 9,408 | 354,376 +| 1000 | 236,053 | 86,352 | 128,512 | 535,888 +| 10000 | 2,290,635 | 766,024| 1,075,648 | 2,489,088 \ No newline at end of file diff --git a/tinyufo/benches/bench_hit_ratio.rs b/tinyufo/benches/bench_hit_ratio.rs index 72dacd5a..23d0e58b 100644 --- a/tinyufo/benches/bench_hit_ratio.rs +++ b/tinyufo/benches/bench_hit_ratio.rs @@ -23,6 +23,7 @@ fn bench_one(zip_exp: f64, cache_size_percent: f32) { let cache_size = (cache_size_percent * ITEMS as f32).round() as usize; let mut lru = lru::LruCache::::new(NonZeroUsize::new(cache_size).unwrap()); let moka = moka::sync::Cache::new(cache_size as u64); + let quick_cache = quick_cache::sync::Cache::new(cache_size); let tinyufo = tinyufo::TinyUfo::new(cache_size, cache_size); let mut rng = thread_rng(); @@ -30,6 +31,7 @@ fn bench_one(zip_exp: f64, cache_size_percent: f32) { let mut lru_hit = 0; let mut moka_hit = 0; + let mut quick_cache_hit = 0; let mut tinyufo_hit = 0; for _ in 0..ITERATIONS { @@ -47,6 +49,12 @@ fn bench_one(zip_exp: f64, cache_size_percent: f32) { moka.insert(key, ()); } + if quick_cache.get(&key).is_some() { + quick_cache_hit += 1; + } else { + quick_cache.insert(key, ()); + } + if tinyufo.get(&key).is_some() { tinyufo_hit += 1; } else { @@ -56,42 +64,46 @@ fn bench_one(zip_exp: f64, cache_size_percent: f32) { print!("{:.2}%\t\t", lru_hit as f32 / ITERATIONS as f32 * 100.0); print!("{:.2}%\t\t", moka_hit as f32 / ITERATIONS as f32 * 100.0); + print!( + "{:.2}%\t\t", + quick_cache_hit as f32 / ITERATIONS as f32 * 100.0 + ); println!("{:.2}%", tinyufo_hit as f32 / ITERATIONS as f32 * 100.0); } /* cargo bench --bench bench_hit_ratio -zipf & cache size lru moka TinyUFO -0.90, 0.005 19.23% 33.46% 33.35% -0.90, 0.01 26.21% 37.88% 40.10% -0.90, 0.05 45.59% 55.34% 57.81% -0.90, 0.1 55.73% 64.22% 66.34% -0.90, 0.25 71.18% 77.15% 78.53% -1.00, 0.005 31.09% 45.65% 45.13% -1.00, 0.01 39.17% 50.69% 52.23% -1.00, 0.05 58.73% 66.95% 68.81% -1.00, 0.1 67.57% 74.35% 75.93% -1.00, 0.25 79.91% 84.34% 85.27% -1.05, 0.005 37.68% 51.77% 51.26% -1.05, 0.01 46.11% 57.07% 58.41% -1.05, 0.05 65.04% 72.33% 73.91% -1.05, 0.1 73.11% 78.96% 80.22% -1.05, 0.25 83.77% 87.45% 88.16% -1.10, 0.005 44.48% 57.86% 57.25% -1.10, 0.01 52.97% 63.18% 64.23% -1.10, 0.05 70.94% 77.27% 78.57% -1.10, 0.1 78.11% 83.05% 84.06% -1.10, 0.25 87.08% 90.06% 90.62% -1.50, 0.005 85.25% 89.89% 89.68% -1.50, 0.01 89.88% 92.79% 92.94% -1.50, 0.05 96.04% 97.09% 97.25% -1.50, 0.1 97.52% 98.17% 98.26% -1.50, 0.25 98.81% 99.09% 99.10% +zipf & cache size lru moka QuickC TinyUFO +0.90, 0.005 19.24% 33.43% 32.33% 33.35% +0.90, 0.01 26.23% 37.86% 38.80% 40.06% +0.90, 0.05 45.58% 55.13% 55.71% 57.80% +0.90, 0.1 55.72% 64.15% 64.01% 66.36% +0.90, 0.25 71.16% 77.12% 75.92% 78.53% +1.00, 0.005 31.08% 45.68% 44.07% 45.15% +1.00, 0.01 39.17% 50.80% 50.90% 52.30% +1.00, 0.05 58.71% 66.92% 67.09% 68.79% +1.00, 0.1 67.59% 74.28% 74.00% 75.92% +1.00, 0.25 79.94% 84.35% 83.45% 85.28% +1.05, 0.005 37.66% 51.78% 50.13% 51.12% +1.05, 0.01 46.07% 57.13% 57.07% 58.41% +1.05, 0.05 65.06% 72.37% 72.41% 73.93% +1.05, 0.1 73.13% 78.97% 78.60% 80.24% +1.05, 0.25 83.74% 87.41% 86.68% 88.14% +1.10, 0.005 44.49% 57.84% 56.16% 57.28% +1.10, 0.01 52.97% 63.19% 62.99% 64.24% +1.10, 0.05 70.95% 77.24% 77.26% 78.55% +1.10, 0.1 78.05% 82.86% 82.66% 84.01% +1.10, 0.25 87.12% 90.10% 89.51% 90.66% +1.50, 0.005 85.27% 89.92% 89.08% 89.69% +1.50, 0.01 89.86% 92.77% 92.44% 92.94% +1.50, 0.05 96.01% 97.08% 96.99% 97.23% +1.50, 0.1 97.51% 98.15% 98.08% 98.24% +1.50, 0.25 98.81% 99.09% 99.03% 99.09% */ fn main() { - println!("zipf & cache size\t\tlru\t\tmoka\t\tTinyUFO",); + println!("zipf & cache size\t\tlru\t\tmoka\t\tQuickC\t\tTinyUFO",); for zif_exp in [0.9, 1.0, 1.05, 1.1, 1.5] { for cache_capacity in [0.005, 0.01, 0.05, 0.1, 0.25] { bench_one(zif_exp, cache_capacity); diff --git a/tinyufo/benches/bench_memory.rs b/tinyufo/benches/bench_memory.rs index e55a5612..271fe122 100644 --- a/tinyufo/benches/bench_memory.rs +++ b/tinyufo/benches/bench_memory.rs @@ -52,6 +52,22 @@ fn bench_moka(zip_exp: f64, items: usize, cache_size_percent: f32) { } } +fn bench_quick_cache(zip_exp: f64, items: usize, cache_size_percent: f32) { + let cache_size = (cache_size_percent * items as f32).round() as usize; + let quick_cache = quick_cache::sync::Cache::new(cache_size); + + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(items, zip_exp).unwrap(); + + for _ in 0..ITERATIONS { + let key = zipf.sample(&mut rng) as u64; + + if quick_cache.get(&key).is_none() { + quick_cache.insert(key, ()); + } + } +} + fn bench_tinyufo(zip_exp: f64, items: usize, cache_size_percent: f32) { let cache_size = (cache_size_percent * items as f32).round() as usize; let tinyufo = tinyufo::TinyUfo::new(cache_size, (cache_size as f32 * 1.0) as usize); @@ -68,6 +84,22 @@ fn bench_tinyufo(zip_exp: f64, items: usize, cache_size_percent: f32) { } } +fn bench_tinyufo_compact(zip_exp: f64, items: usize, cache_size_percent: f32) { + let cache_size = (cache_size_percent * items as f32).round() as usize; + let tinyufo = tinyufo::TinyUfo::new_compact(cache_size, (cache_size as f32 * 1.0) as usize); + + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(items, zip_exp).unwrap(); + + for _ in 0..ITERATIONS { + let key = zipf.sample(&mut rng) as u64; + + if tinyufo.get(&key).is_none() { + tinyufo.put(key, (), 1); + } + } +} + /* cargo bench --bench bench_memory @@ -76,24 +108,36 @@ lru dhat: At t-gmax: 9,408 bytes in 106 blocks moka dhat: At t-gmax: 354,232 bytes in 1,581 blocks +QuickCache +dhat: At t-gmax: 11,840 bytes in 8 blocks TinyUFO dhat: At t-gmax: 37,337 bytes in 351 blocks +TinyUFO compat +dhat: At t-gmax: 19,000 bytes in 60 blocks total items 10000, cache size 10% lru dhat: At t-gmax: 128,512 bytes in 1,004 blocks moka dhat: At t-gmax: 535,320 bytes in 7,278 blocks +QuickCache +dhat: At t-gmax: 93,000 bytes in 66 blocks TinyUFO dhat: At t-gmax: 236,053 bytes in 2,182 blocks +TinyUFO Compact +dhat: At t-gmax: 86,352 bytes in 1,128 blocks total items 100000, cache size 10% lru dhat: At t-gmax: 1,075,648 bytes in 10,004 blocks moka dhat: At t-gmax: 2,489,088 bytes in 62,374 blocks +QuickCache +dhat: At t-gmax: 863,752 bytes in 66 blocks TinyUFO dhat: At t-gmax: 2,290,635 bytes in 20,467 blocks +TinyUFO +dhat: At t-gmax: 766,024 bytes in 10,421 blocks */ fn main() { @@ -111,10 +155,22 @@ fn main() { println!("\nmoka"); } + { + let _profiler = dhat::Profiler::new_heap(); + bench_quick_cache(1.05, items, 0.1); + println!("\nQuickCache"); + } + { let _profiler = dhat::Profiler::new_heap(); bench_tinyufo(1.05, items, 0.1); println!("\nTinyUFO"); } + + { + let _profiler = dhat::Profiler::new_heap(); + bench_tinyufo_compact(1.05, items, 0.1); + println!("\nTinyUFO Compact"); + } } } diff --git a/tinyufo/benches/bench_perf.rs b/tinyufo/benches/bench_perf.rs index 1295fb21..9b740009 100644 --- a/tinyufo/benches/bench_perf.rs +++ b/tinyufo/benches/bench_perf.rs @@ -14,7 +14,7 @@ use rand::prelude::*; use std::num::NonZeroUsize; -use std::sync::Mutex; +use std::sync::{Barrier, Mutex}; use std::thread; use std::time::Instant; @@ -31,7 +31,9 @@ Below is from Linux + Ryzen 5 7600 CPU lru read total 150.423567ms, 30ns avg per operation, 33239472 ops per second moka read total 462.133322ms, 92ns avg per operation, 10819389 ops per second +quick_cache read total 125.618216ms, 25ns avg per operation, 39803144 ops per second tinyufo read total 199.007359ms, 39ns avg per operation, 25124698 ops per second +tinyufo compact read total 331.145859ms, 66ns avg per operation, 15099087 ops per second lru read total 5.402631847s, 1.08µs avg per operation, 925474 ops per second ... @@ -41,10 +43,18 @@ moka read total 2.742258211s, 548ns avg per operation, 1823314 ops per second ... total 14072430 ops per second +quick_cache read total 1.186566627s, 237ns avg per operation, 4213838 ops per second +... +total 33694776 ops per second + tinyufo read total 208.346855ms, 41ns avg per operation, 23998444 ops per second ... total 148691408 ops per second +tinyufo compact read total 539.403037ms, 107ns avg per operation, 9269507 ops per second +... +total 74130632 ops per second + lru mixed read/write 5.500309876s, 1.1µs avg per operation, 909039 ops per second, 407431 misses ... total 6846743 ops per second @@ -53,22 +63,35 @@ moka mixed read/write 2.368500882s, 473ns avg per operation, 2111040 ops per sec ... total 16557962 ops per second +quick_cache mixed read/write 838.072588ms, 167ns avg per operation, 5966070 ops per second 315051 misses +... +total 47698472 ops per second + tinyufo mixed read/write 456.134531ms, 91ns avg per operation, 10961678 ops per second, 294977 misses ... total 80865792 ops per second + +tinyufo compact mixed read/write 638.770053ms, 127ns avg per operation, 7827543 ops per second, 294641 misses +... +total 62600844 ops per second */ fn main() { + println!("Note: these performance numbers vary a lot across different CPUs and OSes."); // we don't bench eviction here so make the caches large enough to hold all let lru = Mutex::new(lru::LruCache::::unbounded()); let moka = moka::sync::Cache::new(ITEMS as u64 + 10); + let quick_cache = quick_cache::sync::Cache::new(ITEMS + 10); let tinyufo = tinyufo::TinyUfo::new(ITEMS + 10, 10); + let tinyufo_compact = tinyufo::TinyUfo::new_compact(ITEMS + 10, 10); // populate first, then we bench access/promotion for i in 0..ITEMS { lru.lock().unwrap().put(i as u64, ()); moka.insert(i as u64, ()); + quick_cache.insert(i as u64, ()); tinyufo.put(i as u64, (), 1); + tinyufo_compact.put(i as u64, (), 1); } // single thread @@ -97,6 +120,17 @@ fn main() { (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 ); + let before = Instant::now(); + for _ in 0..ITERATIONS { + quick_cache.get(&(zipf.sample(&mut rng) as u64)); + } + let elapsed = before.elapsed(); + println!( + "quick_cache read total {elapsed:?}, {:?} avg per operation, {} ops per second", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + let before = Instant::now(); for _ in 0..ITERATIONS { tinyufo.get(&(zipf.sample(&mut rng) as u64)); @@ -108,14 +142,26 @@ fn main() { (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 ); - // concurrent + let before = Instant::now(); + for _ in 0..ITERATIONS { + tinyufo_compact.get(&(zipf.sample(&mut rng) as u64)); + } + let elapsed = before.elapsed(); + println!( + "tinyufo compact read total {elapsed:?}, {:?} avg per operation, {} ops per second", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + // concurrent + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { s.spawn(|| { let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { lru.lock().unwrap().get(&(zipf.sample(&mut rng) as u64)); @@ -135,12 +181,14 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { s.spawn(|| { let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { moka.get(&(zipf.sample(&mut rng) as u64)); @@ -160,12 +208,41 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { s.spawn(|| { let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); + let before = Instant::now(); + for _ in 0..ITERATIONS { + quick_cache.get(&(zipf.sample(&mut rng) as u64)); + } + let elapsed = before.elapsed(); + println!( + "quick_cache read total {elapsed:?}, {:?} avg per operation, {} ops per second", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + }); + } + }); + let elapsed = before.elapsed(); + println!( + "total {} ops per second", + (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 + ); + + let wg = Barrier::new(THREADS); + let before = Instant::now(); + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { tinyufo.get(&(zipf.sample(&mut rng) as u64)); @@ -185,6 +262,33 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let wg = Barrier::new(THREADS); + let before = Instant::now(); + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); + let before = Instant::now(); + for _ in 0..ITERATIONS { + tinyufo_compact.get(&(zipf.sample(&mut rng) as u64)); + } + let elapsed = before.elapsed(); + println!( + "tinyufo compact read total {elapsed:?}, {:?} avg per operation, {} ops per second", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + }); + } + }); + let elapsed = before.elapsed(); + println!( + "total {} ops per second", + (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 + ); + ///// bench mixed read and write ///// const CACHE_SIZE: usize = 1000; let items: usize = 10000; @@ -193,6 +297,7 @@ fn main() { let lru = Mutex::new(lru::LruCache::::new( NonZeroUsize::new(CACHE_SIZE).unwrap(), )); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { @@ -200,6 +305,7 @@ fn main() { let mut miss_count = 0; let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { let key = zipf.sample(&mut rng) as u64; @@ -225,7 +331,7 @@ fn main() { ); let moka = moka::sync::Cache::new(CACHE_SIZE as u64); - + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { @@ -233,6 +339,7 @@ fn main() { let mut miss_count = 0; let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { let key = zipf.sample(&mut rng) as u64; @@ -256,7 +363,41 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let quick_cache = quick_cache::sync::Cache::new(CACHE_SIZE); + let wg = Barrier::new(THREADS); + let before = Instant::now(); + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + let mut miss_count = 0; + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); + let before = Instant::now(); + for _ in 0..ITERATIONS { + let key = zipf.sample(&mut rng) as u64; + if quick_cache.get(&key).is_none() { + quick_cache.insert(key, ()); + miss_count += 1; + } + } + let elapsed = before.elapsed(); + println!( + "quick_cache mixed read/write {elapsed:?}, {:?} avg per operation, {} ops per second {miss_count} misses", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + }); + } + }); + let elapsed = before.elapsed(); + println!( + "total {} ops per second", + (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 + ); + let tinyufo = tinyufo::TinyUfo::new(CACHE_SIZE, CACHE_SIZE); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { @@ -264,6 +405,7 @@ fn main() { let mut miss_count = 0; let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { let key = zipf.sample(&mut rng) as u64; @@ -287,4 +429,38 @@ fn main() { "total {} ops per second", (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + + let tinyufo_compact = tinyufo::TinyUfo::new(CACHE_SIZE, CACHE_SIZE); + let wg = Barrier::new(THREADS); + let before = Instant::now(); + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + let mut miss_count = 0; + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); + let before = Instant::now(); + for _ in 0..ITERATIONS { + let key = zipf.sample(&mut rng) as u64; + if tinyufo_compact.get(&key).is_none() { + tinyufo_compact.put(key, (), 1); + miss_count +=1; + } + } + let elapsed = before.elapsed(); + println!( + "tinyufo compact mixed read/write {elapsed:?}, {:?} avg per operation, {} ops per second, {miss_count} misses", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32, + ); + }); + } + }); + + let elapsed = before.elapsed(); + println!( + "total {} ops per second", + (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 + ); } diff --git a/tinyufo/src/buckets.rs b/tinyufo/src/buckets.rs new file mode 100644 index 00000000..4aa627de --- /dev/null +++ b/tinyufo/src/buckets.rs @@ -0,0 +1,174 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Concurrent storage backend + +use super::{Bucket, Key}; +use ahash::RandomState; +use crossbeam_skiplist::{map::Entry, SkipMap}; +use flurry::HashMap; + +/// N-shard skip list. Memory efficient, constant time lookup on average, but a bit slower +/// than hash map +pub struct Compact(Box<[SkipMap>]>); + +impl Compact { + /// Create a new [Compact] + pub fn new(total_items: usize, items_per_shard: usize) -> Self { + assert!(items_per_shard > 0); + + let shards = std::cmp::max(total_items / items_per_shard, 1); + let mut shard_array = vec![]; + for _ in 0..shards { + shard_array.push(SkipMap::new()); + } + Self(shard_array.into_boxed_slice()) + } + + pub fn get(&self, key: &Key) -> Option>> { + let shard = *key as usize % self.0.len(); + self.0[shard].get(key) + } + + pub fn get_map>) -> V>(&self, key: &Key, f: F) -> Option { + let v = self.get(key); + v.map(f) + } + + fn insert(&self, key: Key, value: Bucket) -> Option<()> { + let shard = key as usize % self.0.len(); + let removed = self.0[shard].remove(&key); + self.0[shard].insert(key, value); + removed.map(|_| ()) + } + + fn remove(&self, key: &Key) { + let shard = *key as usize % self.0.len(); + (&self.0)[shard].remove(key); + } +} + +// Concurrent hash map, fast but use more memory +pub struct Fast(HashMap, RandomState>); + +impl Fast { + pub fn new(total_items: usize) -> Self { + Self(HashMap::with_capacity_and_hasher( + total_items, + RandomState::new(), + )) + } + + pub fn get_map) -> V>(&self, key: &Key, f: F) -> Option { + let pinned = self.0.pin(); + let v = pinned.get(key); + v.map(f) + } + + fn insert(&self, key: Key, value: Bucket) -> Option<()> { + let pinned = self.0.pin(); + pinned.insert(key, value).map(|_| ()) + } + + fn remove(&self, key: &Key) { + let pinned = self.0.pin(); + pinned.remove(key); + } +} + +pub enum Buckets { + Fast(Box>), + Compact(Compact), +} + +impl Buckets { + pub fn new_fast(items: usize) -> Self { + Self::Fast(Box::new(Fast::new(items))) + } + + pub fn new_compact(items: usize, items_per_shard: usize) -> Self { + Self::Compact(Compact::new(items, items_per_shard)) + } + + pub fn insert(&self, key: Key, value: Bucket) -> Option<()> { + match self { + Self::Compact(c) => c.insert(key, value), + Self::Fast(f) => f.insert(key, value), + } + } + + pub fn remove(&self, key: &Key) { + match self { + Self::Compact(c) => c.remove(key), + Self::Fast(f) => f.remove(key), + } + } + + pub fn get_map) -> V>(&self, key: &Key, f: F) -> Option { + match self { + Self::Compact(c) => c.get_map(key, |v| f(v.value())), + Self::Fast(c) => c.get_map(key, f), + } + } + + #[cfg(test)] + pub fn get_queue(&self, key: &Key) -> Option { + self.get_map(key, |v| v.queue.is_main()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fast() { + let fast = Buckets::new_fast(10); + + assert!(fast.get_map(&1, |_| ()).is_none()); + + let bucket = Bucket { + queue: crate::Location::new_small(), + weight: 1, + uses: Default::default(), + data: 1, + }; + fast.insert(1, bucket); + + assert_eq!(fast.get_map(&1, |v| v.data), Some(1)); + + fast.remove(&1); + assert!(fast.get_map(&1, |_| ()).is_none()); + } + + #[test] + fn test_compact() { + let compact = Buckets::new_compact(10, 2); + + assert!(compact.get_map(&1, |_| ()).is_none()); + + let bucket = Bucket { + queue: crate::Location::new_small(), + weight: 1, + uses: Default::default(), + data: 1, + }; + compact.insert(1, bucket); + + assert_eq!(compact.get_map(&1, |v| v.data), Some(1)); + + compact.remove(&1); + assert!(compact.get_map(&1, |_| ()).is_none()); + } +} diff --git a/tinyufo/src/estimation.rs b/tinyufo/src/estimation.rs index 19d84d4b..18c2d4f6 100644 --- a/tinyufo/src/estimation.rs +++ b/tinyufo/src/estimation.rs @@ -39,6 +39,11 @@ impl Estimator { Self::new(hashes, slots) } + fn compact(items: usize) -> Self { + let (slots, hashes) = Self::optimal_paras(items / 100); + Self::new(hashes, slots) + } + /// Create a new `Estimator` with the given amount of hashes and columns (slots). pub fn new(hashes: usize, slots: usize) -> Self { let mut estimator = Vec::with_capacity(hashes); @@ -147,6 +152,15 @@ impl TinyLfu { window_limit: cache_size * 8, } } + + pub fn new_compact(cache_size: usize) -> Self { + Self { + estimator: Estimator::compact(cache_size), + window_counter: Default::default(), + // 8x: just a heuristic to balance the memory usage and accuracy + window_limit: cache_size * 8, + } + } } #[cfg(test)] diff --git a/tinyufo/src/lib.rs b/tinyufo/src/lib.rs index 001f4e3e..015afe0e 100644 --- a/tinyufo/src/lib.rs +++ b/tinyufo/src/lib.rs @@ -20,14 +20,16 @@ use ahash::RandomState; use crossbeam_queue::SegQueue; -use flurry::HashMap; use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; use std::sync::atomic::{ AtomicBool, AtomicU8, Ordering::{Acquire, Relaxed, SeqCst}, }; +mod buckets; mod estimation; + +use buckets::Buckets; use estimation::TinyLfu; use std::hash::Hash; @@ -64,20 +66,20 @@ const USES_CAP: u8 = 3; struct Uses(AtomicU8); impl Uses { - pub fn inc_uses(&self) { + pub fn inc_uses(&self) -> u8 { loop { let uses = self.uses(); if uses >= USES_CAP { - return; + return uses; } if let Err(new) = self.0.compare_exchange(uses, uses + 1, Acquire, Relaxed) { // someone else beat us to it if new >= USES_CAP { // already above cap - return; + return new; } // else, try again } else { - return; + return uses + 1; } } } @@ -126,17 +128,6 @@ struct Bucket { data: T, } -impl Bucket { - fn update_bucket(&self, main_queue: bool, data: T, weight: Weight) -> Self { - Self { - uses: Uses(self.uses.uses().into()), - queue: Location(main_queue.into()), - weight, - data, - } - } -} - const SMALL_QUEUE_PERCENTAGE: f32 = 0.1; struct FiFoQueues { @@ -154,9 +145,7 @@ struct FiFoQueues { _t: PhantomData, } -type Buckets = HashMap, RandomState>; - -impl FiFoQueues { +impl FiFoQueues { fn admit( &self, key: Key, @@ -174,9 +163,29 @@ impl FiFoQueues { assert!(weight > 0); let new_bucket = { - let pinned_buckets = buckets.pin(); - let bucket = pinned_buckets.get(&key); - let Some(bucket) = bucket else { + let Some((uses, queue, weight)) = buckets.get_map(&key, |bucket| { + // the item exists, in case weight changes + let old_weight = bucket.weight; + let uses = bucket.uses.inc_uses(); + + fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) { + if old == new { + return; + } + if old > new { + weight.fetch_sub((old - new) as usize, SeqCst); + } else { + weight.fetch_add((new - old) as usize, SeqCst); + } + } + let queue = bucket.queue.is_main(); + if queue == MAIN { + update_atomic(&self.main_weight, old_weight, weight); + } else { + update_atomic(&self.small_weight, old_weight, weight); + } + (uses, queue, weight) + }) else { let mut evicted = self.evict_to_limit(weight, buckets); // TODO: figure out the right way to compare frequencies of different weights across // many evicted assets. For now TinyLFU is only used when only evicting 1 item. @@ -204,7 +213,7 @@ impl FiFoQueues { uses: Default::default(), // 0 data, }; - let old = pinned_buckets.insert(key, bucket); + let old = buckets.insert(key, bucket); if old.is_none() { // Always push key first before updating weight // If doing the other order, another concurrent thread might not @@ -215,32 +224,16 @@ impl FiFoQueues { // TODO: compare old.weight and update accordingly return evicted; }; - - // the item exists, in case weight changes - let old_weight = bucket.weight; - bucket.uses.inc_uses(); - - fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) { - if old == new { - return; - } - if old > new { - weight.fetch_sub((old - new) as usize, SeqCst); - } else { - weight.fetch_add((new - old) as usize, SeqCst); - } - } - if bucket.queue.is_main() { - update_atomic(&self.main_weight, old_weight, weight); - bucket.update_bucket(MAIN, data, weight) - } else { - update_atomic(&self.small_weight, old_weight, weight); - bucket.update_bucket(SMALL, data, weight) + Bucket { + queue: Location(queue.into()), + weight, + uses: Uses(uses.into()), + data, } }; // replace the existing one - buckets.pin().insert(key, new_bucket); + buckets.insert(key, new_bucket); // NOTE: there is a chance that the item itself is evicted if it happens to be the one selected // by the algorithm. We could avoid this by checking if the item is in the returned evicted items, @@ -295,61 +288,67 @@ impl FiFoQueues { // empty queue, this is caught between another pop() and fetch_sub() return None; }; - let pinned_buckets = buckets.pin(); - let maybe_bucket = pinned_buckets.get(&to_evict); - - let Some(bucket) = maybe_bucket.as_ref() else { - //key in queue but not bucket, shouldn't happen, but ignore - continue; - }; - - let weight = bucket.weight; - self.small_weight.fetch_sub(weight as usize, SeqCst); - if bucket.uses.uses() > 1 { - // move to main - bucket.queue.move_to_main(); - self.main.push(to_evict); - self.main_weight.fetch_add(weight as usize, SeqCst); - // continue until find one to evict - continue; + let v = buckets + .get_map(&to_evict, |bucket| { + let weight = bucket.weight; + self.small_weight.fetch_sub(weight as usize, SeqCst); + + if bucket.uses.uses() > 1 { + // move to main + bucket.queue.move_to_main(); + self.main.push(to_evict); + self.main_weight.fetch_add(weight as usize, SeqCst); + // continue until find one to evict + None + } else { + let data = bucket.data.clone(); + let weight = bucket.weight; + buckets.remove(&to_evict); + Some(KV { + key: to_evict, + data, + weight, + }) + } + }) + .flatten(); + if v.is_some() { + // found the one to evict, break + return v; } - // move to ghost - - let data = bucket.data.clone(); - let weight = bucket.weight; - pinned_buckets.remove(&to_evict); - return Some(KV { - key: to_evict, - data, - weight, - }); } } fn evict_one_from_main(&self, buckets: &Buckets) -> Option> { loop { let to_evict = self.main.pop()?; - let buckets = buckets.pin(); - let maybe_bucket = buckets.get(&to_evict); - if let Some(bucket) = maybe_bucket.as_ref() { - if bucket.uses.decr_uses() > 0 { - // put it back - self.main.push(to_evict); - // continue the loop - } else { - // evict - let weight = bucket.weight; - self.main_weight.fetch_sub(weight as usize, SeqCst); - let data = bucket.data.clone(); - buckets.remove(&to_evict); - return Some(KV { - key: to_evict, - data, - weight, - }); - } - } // else: key in queue but not bucket, shouldn't happen + + if let Some(v) = buckets + .get_map(&to_evict, |bucket| { + if bucket.uses.decr_uses() > 0 { + // put it back + self.main.push(to_evict); + // continue the loop + None + } else { + // evict + let weight = bucket.weight; + self.main_weight.fetch_sub(weight as usize, SeqCst); + let data = bucket.data.clone(); + buckets.remove(&to_evict); + Some(KV { + key: to_evict, + data, + weight, + }) + } + }) + .flatten() + { + // found the one to evict, break + return Some(v); + } } } } @@ -357,12 +356,11 @@ impl FiFoQueues { /// [TinyUfo] cache pub struct TinyUfo { queues: FiFoQueues, - buckets: HashMap, RandomState>, + buckets: Buckets, random_status: RandomState, _k: PhantomData, } - -impl TinyUfo { +impl TinyUfo { /// Create a new TinyUfo cache with the given weight limit and the given /// size limit of the ghost queue. pub fn new(total_weight_limit: usize, estimated_size: usize) -> Self { @@ -377,7 +375,29 @@ impl TinyUfo { }; TinyUfo { queues, - buckets: HashMap::with_capacity_and_hasher(estimated_size, RandomState::new()), + buckets: Buckets::new_fast(estimated_size), + random_status: RandomState::new(), + _k: PhantomData, + } + } + + /// Create a new TinyUfo cache but with more memory efficient data structures. + /// The trade-off is that the the get() is slower by a constant factor. + /// The cache hit ratio could be higher as this type of TinyUFO allows to store + /// more assets with the same memory. + pub fn new_compact(total_weight_limit: usize, estimated_size: usize) -> Self { + let queues = FiFoQueues { + small: SegQueue::new(), + small_weight: 0.into(), + main: SegQueue::new(), + main_weight: 0.into(), + total_weight_limit, + estimator: TinyLfu::new_compact(estimated_size), + _t: PhantomData, + }; + TinyUfo { + queues, + buckets: Buckets::new_compact(estimated_size, 32), random_status: RandomState::new(), _k: PhantomData, } @@ -390,8 +410,7 @@ impl TinyUfo { /// Return Some(T) if the key exists pub fn get(&self, key: &K) -> Option { let key = self.random_status.hash_one(key); - let buckets = self.buckets.pin(); - buckets.get(&key).map(|p| { + self.buckets.get_map(&key, |p| { p.uses.inc_uses(); p.data.clone() }) @@ -427,7 +446,7 @@ impl TinyUfo { #[cfg(test)] fn peek_queue(&self, key: K) -> Option { let key = self.random_status.hash_one(&key); - self.buckets.pin().get(&key).map(|p| p.queue.value()) + self.buckets.get_queue(&key) } } @@ -627,4 +646,54 @@ mod tests { assert_eq!(cache.peek_queue(3), Some(MAIN)); assert_eq!(cache.peek_queue(4), None); } + + #[test] + fn test_evict_from_small_compact() { + let cache = TinyUfo::new(5, 5); + + cache.put(1, 1, 1); + cache.put(2, 2, 2); + cache.put(3, 3, 2); + // cache full now + + assert_eq!(cache.peek_queue(1), Some(SMALL)); + assert_eq!(cache.peek_queue(2), Some(SMALL)); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + + let evicted = cache.put(4, 4, 3); + assert_eq!(evicted.len(), 2); + assert_eq!(evicted[0].data, 1); + assert_eq!(evicted[1].data, 2); + + assert_eq!(cache.peek_queue(1), None); + assert_eq!(cache.peek_queue(2), None); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + } + + #[test] + fn test_evict_from_small_to_main_compact() { + let cache = TinyUfo::new(5, 5); + + cache.put(1, 1, 1); + cache.put(2, 2, 2); + cache.put(3, 3, 2); + // cache full now + + cache.get(&1); + cache.get(&1); // 1 will be moved to main during next eviction + + assert_eq!(cache.peek_queue(1), Some(SMALL)); + assert_eq!(cache.peek_queue(2), Some(SMALL)); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + + let evicted = cache.put(4, 4, 1); + assert_eq!(evicted.len(), 1); + assert_eq!(evicted[0].data, 2); + + assert_eq!(cache.peek_queue(1), Some(MAIN)); + // 2 is evicted because 1 is in main + assert_eq!(cache.peek_queue(2), None); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + assert_eq!(cache.peek_queue(4), Some(SMALL)); + } }