From 3f1afaedbb83ee1a8f87125a7c60f2b45e24f163 Mon Sep 17 00:00:00 2001 From: Yuchen Wu Date: Fri, 8 Mar 2024 02:02:23 -0800 Subject: [PATCH 1/8] TinyUFO: add the option to use sharded skip list for storage This option makes it more memory efficient but a bit slower. --- .bleep | 2 +- pingora-memory-cache/src/lib.rs | 2 +- pingora-memory-cache/src/read_through.rs | 6 +- tinyufo/Cargo.toml | 1 + tinyufo/README.md | 14 +- tinyufo/benches/bench_memory.rs | 28 +++ tinyufo/benches/bench_perf.rs | 79 +++++++ tinyufo/src/buckets.rs | 174 +++++++++++++++ tinyufo/src/estimation.rs | 14 ++ tinyufo/src/lib.rs | 267 ++++++++++++++--------- 10 files changed, 476 insertions(+), 111 deletions(-) create mode 100644 tinyufo/src/buckets.rs diff --git a/.bleep b/.bleep index becb0a183..a37da60bd 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -deb3c5409e938ec9c7d0da9b7a2d331eabbb2cd5 \ No newline at end of file +b1c09703606d32b02f24d2e77d82936ba95e8064 \ No newline at end of file diff --git a/pingora-memory-cache/src/lib.rs b/pingora-memory-cache/src/lib.rs index f5c037c3d..2b02d28f2 100644 --- a/pingora-memory-cache/src/lib.rs +++ b/pingora-memory-cache/src/lib.rs @@ -81,7 +81,7 @@ pub struct MemoryCache { pub(crate) hasher: RandomState, } -impl MemoryCache { +impl MemoryCache { /// Create a new [MemoryCache] with the given size. pub fn new(size: usize) -> Self { MemoryCache { diff --git a/pingora-memory-cache/src/read_through.rs b/pingora-memory-cache/src/read_through.rs index 05a8d8924..a10a7c01e 100644 --- a/pingora-memory-cache/src/read_through.rs +++ b/pingora-memory-cache/src/read_through.rs @@ -123,7 +123,7 @@ where impl RTCache where K: Hash + Send, - T: Clone + Send + Sync, + T: Clone + Send + Sync + 'static, { /// Create a new [RTCache] of given size. `lock_age` defines how long a lock is valid for. /// `lock_timeout` is used to stop a lookup from holding on to the key for too long. @@ -142,7 +142,7 @@ where impl RTCache where K: Hash + Send, - T: Clone + Send + Sync, + T: Clone + Send + Sync + 'static, CB: Lookup, { /// Query the cache for a given value. If it exists and no TTL is configured initially, it will @@ -288,7 +288,7 @@ where impl RTCache where K: Hash + Send, - T: Clone + Send + Sync, + T: Clone + Send + Sync + 'static, CB: MultiLookup, { /// Same behavior as [RTCache::get] but for an arbitrary amount of keys. diff --git a/tinyufo/Cargo.toml b/tinyufo/Cargo.toml index a726715ec..f21b201d7 100644 --- a/tinyufo/Cargo.toml +++ b/tinyufo/Cargo.toml @@ -20,6 +20,7 @@ ahash = { workspace = true } flurry = "<0.5.0" # Try not to require Rust 1.71 parking_lot = "0" crossbeam-queue = "0" +crossbeam-skiplist = "0" [dev-dependencies] rand = "0" diff --git a/tinyufo/README.md b/tinyufo/README.md index 50e2dd3fa..bf24ec10f 100644 --- a/tinyufo/README.md +++ b/tinyufo/README.md @@ -38,12 +38,12 @@ Because of TinyUFO's lock-free design, it greatly outperforms the others. ### Memory overhead -The table below show the memory allocation (in bytes) of the compared cache library under certain workloads to store zero-sized assets. +TinyUFO provides a compact mode to trade raw read speed for more memory efficiency. Whether the saving worthy the trade off depends on the actual size and the work load. For small in-memory assets, the saved memory means more things can be cached. -| cache size | TinyUFO | LRU | moka | -| -------- | ------- | ------- | ------ | -| 100 | 39,409 | 9,408 | 354,376 -| 1000 | 236,053 | 128,512 | 535,888 -| 10000 | 2,290,635 | 1,075,648 | 2,489,088 +The table below show the memory allocation (in bytes) of the compared cache library under certain workloads to store zero-sized assets. -Whether these overheads matter depends on the actual sizes and volume of the assets. The more advanced algorithms are likely to be less memory efficient than the simple LRU. \ No newline at end of file +| cache size | TinyUFO | TinyUFO compact | LRU | moka | +| -------- | ------- | ------- | ------- | ------ | +| 100 | 39,409 | 19,000 | 9,408 | 354,376 +| 1000 | 236,053 | 86,352 | 128,512 | 535,888 +| 10000 | 2,290,635 | 766,024| 1,075,648 | 2,489,088 \ No newline at end of file diff --git a/tinyufo/benches/bench_memory.rs b/tinyufo/benches/bench_memory.rs index e55a5612f..9d49210f2 100644 --- a/tinyufo/benches/bench_memory.rs +++ b/tinyufo/benches/bench_memory.rs @@ -68,6 +68,22 @@ fn bench_tinyufo(zip_exp: f64, items: usize, cache_size_percent: f32) { } } +fn bench_tinyufo_compact(zip_exp: f64, items: usize, cache_size_percent: f32) { + let cache_size = (cache_size_percent * items as f32).round() as usize; + let tinyufo = tinyufo::TinyUfo::new_compact(cache_size, (cache_size as f32 * 1.0) as usize); + + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(items, zip_exp).unwrap(); + + for _ in 0..ITERATIONS { + let key = zipf.sample(&mut rng) as u64; + + if tinyufo.get(&key).is_none() { + tinyufo.put(key, (), 1); + } + } +} + /* cargo bench --bench bench_memory @@ -78,6 +94,8 @@ moka dhat: At t-gmax: 354,232 bytes in 1,581 blocks TinyUFO dhat: At t-gmax: 37,337 bytes in 351 blocks +TinyUFO compat +dhat: At t-gmax: 19,000 bytes in 60 blocks total items 10000, cache size 10% lru @@ -86,6 +104,8 @@ moka dhat: At t-gmax: 535,320 bytes in 7,278 blocks TinyUFO dhat: At t-gmax: 236,053 bytes in 2,182 blocks +TinyUFO Compact +dhat: At t-gmax: 86,352 bytes in 1,128 blocks total items 100000, cache size 10% lru @@ -94,6 +114,8 @@ moka dhat: At t-gmax: 2,489,088 bytes in 62,374 blocks TinyUFO dhat: At t-gmax: 2,290,635 bytes in 20,467 blocks +TinyUFO +dhat: At t-gmax: 766,024 bytes in 10,421 blocks */ fn main() { @@ -116,5 +138,11 @@ fn main() { bench_tinyufo(1.05, items, 0.1); println!("\nTinyUFO"); } + + { + let _profiler = dhat::Profiler::new_heap(); + bench_tinyufo_compact(1.05, items, 0.1); + println!("\nTinyUFO Compact"); + } } } diff --git a/tinyufo/benches/bench_perf.rs b/tinyufo/benches/bench_perf.rs index 1295fb21b..bee8a110c 100644 --- a/tinyufo/benches/bench_perf.rs +++ b/tinyufo/benches/bench_perf.rs @@ -32,6 +32,7 @@ Below is from Linux + Ryzen 5 7600 CPU lru read total 150.423567ms, 30ns avg per operation, 33239472 ops per second moka read total 462.133322ms, 92ns avg per operation, 10819389 ops per second tinyufo read total 199.007359ms, 39ns avg per operation, 25124698 ops per second +tinyufo compact read total 331.145859ms, 66ns avg per operation, 15099087 ops per second lru read total 5.402631847s, 1.08µs avg per operation, 925474 ops per second ... @@ -45,6 +46,10 @@ tinyufo read total 208.346855ms, 41ns avg per operation, 23998444 ops per second ... total 148691408 ops per second +tinyufo compact read total 539.403037ms, 107ns avg per operation, 9269507 ops per second +... +total 74130632 ops per second + lru mixed read/write 5.500309876s, 1.1µs avg per operation, 909039 ops per second, 407431 misses ... total 6846743 ops per second @@ -56,6 +61,10 @@ total 16557962 ops per second tinyufo mixed read/write 456.134531ms, 91ns avg per operation, 10961678 ops per second, 294977 misses ... total 80865792 ops per second + +tinyufo compact mixed read/write 638.770053ms, 127ns avg per operation, 7827543 ops per second, 294641 misses +... +total 62600844 ops per second */ fn main() { @@ -63,12 +72,14 @@ fn main() { let lru = Mutex::new(lru::LruCache::::unbounded()); let moka = moka::sync::Cache::new(ITEMS as u64 + 10); let tinyufo = tinyufo::TinyUfo::new(ITEMS + 10, 10); + let tinyufo_compact = tinyufo::TinyUfo::new_compact(ITEMS + 10, 10); // populate first, then we bench access/promotion for i in 0..ITEMS { lru.lock().unwrap().put(i as u64, ()); moka.insert(i as u64, ()); tinyufo.put(i as u64, (), 1); + tinyufo_compact.put(i as u64, (), 1); } // single thread @@ -108,6 +119,17 @@ fn main() { (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 ); + let before = Instant::now(); + for _ in 0..ITERATIONS { + tinyufo_compact.get(&(zipf.sample(&mut rng) as u64)); + } + let elapsed = before.elapsed(); + println!( + "tinyufo compact read total {elapsed:?}, {:?} avg per operation, {} ops per second", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + // concurrent let before = Instant::now(); @@ -185,6 +207,31 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let before = Instant::now(); + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + let before = Instant::now(); + for _ in 0..ITERATIONS { + tinyufo_compact.get(&(zipf.sample(&mut rng) as u64)); + } + let elapsed = before.elapsed(); + println!( + "tinyufo compact read total {elapsed:?}, {:?} avg per operation, {} ops per second", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + }); + } + }); + let elapsed = before.elapsed(); + println!( + "total {} ops per second", + (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 + ); + ///// bench mixed read and write ///// const CACHE_SIZE: usize = 1000; let items: usize = 10000; @@ -287,4 +334,36 @@ fn main() { "total {} ops per second", (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + + let tinyufo_compact = tinyufo::TinyUfo::new(CACHE_SIZE, CACHE_SIZE); + let before = Instant::now(); + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + let mut miss_count = 0; + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + let before = Instant::now(); + for _ in 0..ITERATIONS { + let key = zipf.sample(&mut rng) as u64; + if tinyufo_compact.get(&key).is_none() { + tinyufo_compact.put(key, (), 1); + miss_count +=1; + } + } + let elapsed = before.elapsed(); + println!( + "tinyufo compact mixed read/write {elapsed:?}, {:?} avg per operation, {} ops per second, {miss_count} misses", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32, + ); + }); + } + }); + + let elapsed = before.elapsed(); + println!( + "total {} ops per second", + (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 + ); } diff --git a/tinyufo/src/buckets.rs b/tinyufo/src/buckets.rs new file mode 100644 index 000000000..4aa627de1 --- /dev/null +++ b/tinyufo/src/buckets.rs @@ -0,0 +1,174 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Concurrent storage backend + +use super::{Bucket, Key}; +use ahash::RandomState; +use crossbeam_skiplist::{map::Entry, SkipMap}; +use flurry::HashMap; + +/// N-shard skip list. Memory efficient, constant time lookup on average, but a bit slower +/// than hash map +pub struct Compact(Box<[SkipMap>]>); + +impl Compact { + /// Create a new [Compact] + pub fn new(total_items: usize, items_per_shard: usize) -> Self { + assert!(items_per_shard > 0); + + let shards = std::cmp::max(total_items / items_per_shard, 1); + let mut shard_array = vec![]; + for _ in 0..shards { + shard_array.push(SkipMap::new()); + } + Self(shard_array.into_boxed_slice()) + } + + pub fn get(&self, key: &Key) -> Option>> { + let shard = *key as usize % self.0.len(); + self.0[shard].get(key) + } + + pub fn get_map>) -> V>(&self, key: &Key, f: F) -> Option { + let v = self.get(key); + v.map(f) + } + + fn insert(&self, key: Key, value: Bucket) -> Option<()> { + let shard = key as usize % self.0.len(); + let removed = self.0[shard].remove(&key); + self.0[shard].insert(key, value); + removed.map(|_| ()) + } + + fn remove(&self, key: &Key) { + let shard = *key as usize % self.0.len(); + (&self.0)[shard].remove(key); + } +} + +// Concurrent hash map, fast but use more memory +pub struct Fast(HashMap, RandomState>); + +impl Fast { + pub fn new(total_items: usize) -> Self { + Self(HashMap::with_capacity_and_hasher( + total_items, + RandomState::new(), + )) + } + + pub fn get_map) -> V>(&self, key: &Key, f: F) -> Option { + let pinned = self.0.pin(); + let v = pinned.get(key); + v.map(f) + } + + fn insert(&self, key: Key, value: Bucket) -> Option<()> { + let pinned = self.0.pin(); + pinned.insert(key, value).map(|_| ()) + } + + fn remove(&self, key: &Key) { + let pinned = self.0.pin(); + pinned.remove(key); + } +} + +pub enum Buckets { + Fast(Box>), + Compact(Compact), +} + +impl Buckets { + pub fn new_fast(items: usize) -> Self { + Self::Fast(Box::new(Fast::new(items))) + } + + pub fn new_compact(items: usize, items_per_shard: usize) -> Self { + Self::Compact(Compact::new(items, items_per_shard)) + } + + pub fn insert(&self, key: Key, value: Bucket) -> Option<()> { + match self { + Self::Compact(c) => c.insert(key, value), + Self::Fast(f) => f.insert(key, value), + } + } + + pub fn remove(&self, key: &Key) { + match self { + Self::Compact(c) => c.remove(key), + Self::Fast(f) => f.remove(key), + } + } + + pub fn get_map) -> V>(&self, key: &Key, f: F) -> Option { + match self { + Self::Compact(c) => c.get_map(key, |v| f(v.value())), + Self::Fast(c) => c.get_map(key, f), + } + } + + #[cfg(test)] + pub fn get_queue(&self, key: &Key) -> Option { + self.get_map(key, |v| v.queue.is_main()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fast() { + let fast = Buckets::new_fast(10); + + assert!(fast.get_map(&1, |_| ()).is_none()); + + let bucket = Bucket { + queue: crate::Location::new_small(), + weight: 1, + uses: Default::default(), + data: 1, + }; + fast.insert(1, bucket); + + assert_eq!(fast.get_map(&1, |v| v.data), Some(1)); + + fast.remove(&1); + assert!(fast.get_map(&1, |_| ()).is_none()); + } + + #[test] + fn test_compact() { + let compact = Buckets::new_compact(10, 2); + + assert!(compact.get_map(&1, |_| ()).is_none()); + + let bucket = Bucket { + queue: crate::Location::new_small(), + weight: 1, + uses: Default::default(), + data: 1, + }; + compact.insert(1, bucket); + + assert_eq!(compact.get_map(&1, |v| v.data), Some(1)); + + compact.remove(&1); + assert!(compact.get_map(&1, |_| ()).is_none()); + } +} diff --git a/tinyufo/src/estimation.rs b/tinyufo/src/estimation.rs index 19d84d4b0..18c2d4f67 100644 --- a/tinyufo/src/estimation.rs +++ b/tinyufo/src/estimation.rs @@ -39,6 +39,11 @@ impl Estimator { Self::new(hashes, slots) } + fn compact(items: usize) -> Self { + let (slots, hashes) = Self::optimal_paras(items / 100); + Self::new(hashes, slots) + } + /// Create a new `Estimator` with the given amount of hashes and columns (slots). pub fn new(hashes: usize, slots: usize) -> Self { let mut estimator = Vec::with_capacity(hashes); @@ -147,6 +152,15 @@ impl TinyLfu { window_limit: cache_size * 8, } } + + pub fn new_compact(cache_size: usize) -> Self { + Self { + estimator: Estimator::compact(cache_size), + window_counter: Default::default(), + // 8x: just a heuristic to balance the memory usage and accuracy + window_limit: cache_size * 8, + } + } } #[cfg(test)] diff --git a/tinyufo/src/lib.rs b/tinyufo/src/lib.rs index 001f4e3e8..015afe0ef 100644 --- a/tinyufo/src/lib.rs +++ b/tinyufo/src/lib.rs @@ -20,14 +20,16 @@ use ahash::RandomState; use crossbeam_queue::SegQueue; -use flurry::HashMap; use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; use std::sync::atomic::{ AtomicBool, AtomicU8, Ordering::{Acquire, Relaxed, SeqCst}, }; +mod buckets; mod estimation; + +use buckets::Buckets; use estimation::TinyLfu; use std::hash::Hash; @@ -64,20 +66,20 @@ const USES_CAP: u8 = 3; struct Uses(AtomicU8); impl Uses { - pub fn inc_uses(&self) { + pub fn inc_uses(&self) -> u8 { loop { let uses = self.uses(); if uses >= USES_CAP { - return; + return uses; } if let Err(new) = self.0.compare_exchange(uses, uses + 1, Acquire, Relaxed) { // someone else beat us to it if new >= USES_CAP { // already above cap - return; + return new; } // else, try again } else { - return; + return uses + 1; } } } @@ -126,17 +128,6 @@ struct Bucket { data: T, } -impl Bucket { - fn update_bucket(&self, main_queue: bool, data: T, weight: Weight) -> Self { - Self { - uses: Uses(self.uses.uses().into()), - queue: Location(main_queue.into()), - weight, - data, - } - } -} - const SMALL_QUEUE_PERCENTAGE: f32 = 0.1; struct FiFoQueues { @@ -154,9 +145,7 @@ struct FiFoQueues { _t: PhantomData, } -type Buckets = HashMap, RandomState>; - -impl FiFoQueues { +impl FiFoQueues { fn admit( &self, key: Key, @@ -174,9 +163,29 @@ impl FiFoQueues { assert!(weight > 0); let new_bucket = { - let pinned_buckets = buckets.pin(); - let bucket = pinned_buckets.get(&key); - let Some(bucket) = bucket else { + let Some((uses, queue, weight)) = buckets.get_map(&key, |bucket| { + // the item exists, in case weight changes + let old_weight = bucket.weight; + let uses = bucket.uses.inc_uses(); + + fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) { + if old == new { + return; + } + if old > new { + weight.fetch_sub((old - new) as usize, SeqCst); + } else { + weight.fetch_add((new - old) as usize, SeqCst); + } + } + let queue = bucket.queue.is_main(); + if queue == MAIN { + update_atomic(&self.main_weight, old_weight, weight); + } else { + update_atomic(&self.small_weight, old_weight, weight); + } + (uses, queue, weight) + }) else { let mut evicted = self.evict_to_limit(weight, buckets); // TODO: figure out the right way to compare frequencies of different weights across // many evicted assets. For now TinyLFU is only used when only evicting 1 item. @@ -204,7 +213,7 @@ impl FiFoQueues { uses: Default::default(), // 0 data, }; - let old = pinned_buckets.insert(key, bucket); + let old = buckets.insert(key, bucket); if old.is_none() { // Always push key first before updating weight // If doing the other order, another concurrent thread might not @@ -215,32 +224,16 @@ impl FiFoQueues { // TODO: compare old.weight and update accordingly return evicted; }; - - // the item exists, in case weight changes - let old_weight = bucket.weight; - bucket.uses.inc_uses(); - - fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) { - if old == new { - return; - } - if old > new { - weight.fetch_sub((old - new) as usize, SeqCst); - } else { - weight.fetch_add((new - old) as usize, SeqCst); - } - } - if bucket.queue.is_main() { - update_atomic(&self.main_weight, old_weight, weight); - bucket.update_bucket(MAIN, data, weight) - } else { - update_atomic(&self.small_weight, old_weight, weight); - bucket.update_bucket(SMALL, data, weight) + Bucket { + queue: Location(queue.into()), + weight, + uses: Uses(uses.into()), + data, } }; // replace the existing one - buckets.pin().insert(key, new_bucket); + buckets.insert(key, new_bucket); // NOTE: there is a chance that the item itself is evicted if it happens to be the one selected // by the algorithm. We could avoid this by checking if the item is in the returned evicted items, @@ -295,61 +288,67 @@ impl FiFoQueues { // empty queue, this is caught between another pop() and fetch_sub() return None; }; - let pinned_buckets = buckets.pin(); - let maybe_bucket = pinned_buckets.get(&to_evict); - - let Some(bucket) = maybe_bucket.as_ref() else { - //key in queue but not bucket, shouldn't happen, but ignore - continue; - }; - - let weight = bucket.weight; - self.small_weight.fetch_sub(weight as usize, SeqCst); - if bucket.uses.uses() > 1 { - // move to main - bucket.queue.move_to_main(); - self.main.push(to_evict); - self.main_weight.fetch_add(weight as usize, SeqCst); - // continue until find one to evict - continue; + let v = buckets + .get_map(&to_evict, |bucket| { + let weight = bucket.weight; + self.small_weight.fetch_sub(weight as usize, SeqCst); + + if bucket.uses.uses() > 1 { + // move to main + bucket.queue.move_to_main(); + self.main.push(to_evict); + self.main_weight.fetch_add(weight as usize, SeqCst); + // continue until find one to evict + None + } else { + let data = bucket.data.clone(); + let weight = bucket.weight; + buckets.remove(&to_evict); + Some(KV { + key: to_evict, + data, + weight, + }) + } + }) + .flatten(); + if v.is_some() { + // found the one to evict, break + return v; } - // move to ghost - - let data = bucket.data.clone(); - let weight = bucket.weight; - pinned_buckets.remove(&to_evict); - return Some(KV { - key: to_evict, - data, - weight, - }); } } fn evict_one_from_main(&self, buckets: &Buckets) -> Option> { loop { let to_evict = self.main.pop()?; - let buckets = buckets.pin(); - let maybe_bucket = buckets.get(&to_evict); - if let Some(bucket) = maybe_bucket.as_ref() { - if bucket.uses.decr_uses() > 0 { - // put it back - self.main.push(to_evict); - // continue the loop - } else { - // evict - let weight = bucket.weight; - self.main_weight.fetch_sub(weight as usize, SeqCst); - let data = bucket.data.clone(); - buckets.remove(&to_evict); - return Some(KV { - key: to_evict, - data, - weight, - }); - } - } // else: key in queue but not bucket, shouldn't happen + + if let Some(v) = buckets + .get_map(&to_evict, |bucket| { + if bucket.uses.decr_uses() > 0 { + // put it back + self.main.push(to_evict); + // continue the loop + None + } else { + // evict + let weight = bucket.weight; + self.main_weight.fetch_sub(weight as usize, SeqCst); + let data = bucket.data.clone(); + buckets.remove(&to_evict); + Some(KV { + key: to_evict, + data, + weight, + }) + } + }) + .flatten() + { + // found the one to evict, break + return Some(v); + } } } } @@ -357,12 +356,11 @@ impl FiFoQueues { /// [TinyUfo] cache pub struct TinyUfo { queues: FiFoQueues, - buckets: HashMap, RandomState>, + buckets: Buckets, random_status: RandomState, _k: PhantomData, } - -impl TinyUfo { +impl TinyUfo { /// Create a new TinyUfo cache with the given weight limit and the given /// size limit of the ghost queue. pub fn new(total_weight_limit: usize, estimated_size: usize) -> Self { @@ -377,7 +375,29 @@ impl TinyUfo { }; TinyUfo { queues, - buckets: HashMap::with_capacity_and_hasher(estimated_size, RandomState::new()), + buckets: Buckets::new_fast(estimated_size), + random_status: RandomState::new(), + _k: PhantomData, + } + } + + /// Create a new TinyUfo cache but with more memory efficient data structures. + /// The trade-off is that the the get() is slower by a constant factor. + /// The cache hit ratio could be higher as this type of TinyUFO allows to store + /// more assets with the same memory. + pub fn new_compact(total_weight_limit: usize, estimated_size: usize) -> Self { + let queues = FiFoQueues { + small: SegQueue::new(), + small_weight: 0.into(), + main: SegQueue::new(), + main_weight: 0.into(), + total_weight_limit, + estimator: TinyLfu::new_compact(estimated_size), + _t: PhantomData, + }; + TinyUfo { + queues, + buckets: Buckets::new_compact(estimated_size, 32), random_status: RandomState::new(), _k: PhantomData, } @@ -390,8 +410,7 @@ impl TinyUfo { /// Return Some(T) if the key exists pub fn get(&self, key: &K) -> Option { let key = self.random_status.hash_one(key); - let buckets = self.buckets.pin(); - buckets.get(&key).map(|p| { + self.buckets.get_map(&key, |p| { p.uses.inc_uses(); p.data.clone() }) @@ -427,7 +446,7 @@ impl TinyUfo { #[cfg(test)] fn peek_queue(&self, key: K) -> Option { let key = self.random_status.hash_one(&key); - self.buckets.pin().get(&key).map(|p| p.queue.value()) + self.buckets.get_queue(&key) } } @@ -627,4 +646,54 @@ mod tests { assert_eq!(cache.peek_queue(3), Some(MAIN)); assert_eq!(cache.peek_queue(4), None); } + + #[test] + fn test_evict_from_small_compact() { + let cache = TinyUfo::new(5, 5); + + cache.put(1, 1, 1); + cache.put(2, 2, 2); + cache.put(3, 3, 2); + // cache full now + + assert_eq!(cache.peek_queue(1), Some(SMALL)); + assert_eq!(cache.peek_queue(2), Some(SMALL)); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + + let evicted = cache.put(4, 4, 3); + assert_eq!(evicted.len(), 2); + assert_eq!(evicted[0].data, 1); + assert_eq!(evicted[1].data, 2); + + assert_eq!(cache.peek_queue(1), None); + assert_eq!(cache.peek_queue(2), None); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + } + + #[test] + fn test_evict_from_small_to_main_compact() { + let cache = TinyUfo::new(5, 5); + + cache.put(1, 1, 1); + cache.put(2, 2, 2); + cache.put(3, 3, 2); + // cache full now + + cache.get(&1); + cache.get(&1); // 1 will be moved to main during next eviction + + assert_eq!(cache.peek_queue(1), Some(SMALL)); + assert_eq!(cache.peek_queue(2), Some(SMALL)); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + + let evicted = cache.put(4, 4, 1); + assert_eq!(evicted.len(), 1); + assert_eq!(evicted[0].data, 2); + + assert_eq!(cache.peek_queue(1), Some(MAIN)); + // 2 is evicted because 1 is in main + assert_eq!(cache.peek_queue(2), None); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + assert_eq!(cache.peek_queue(4), Some(SMALL)); + } } From d7f65620c05ad1f98dee21e529b0343c7e24f05e Mon Sep 17 00:00:00 2001 From: gengteng Date: Sun, 3 Mar 2024 04:26:22 +0000 Subject: [PATCH 2/8] feat: Enhance Server::new to accept impl Into> for ergonomics Replicated-from: https://github.com/cloudflare/pingora/pull/100 Includes-commit: 15cffed202c1a1cdd35ebf9b6649268fb54acaba --- .bleep | 2 +- pingora-core/src/server/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.bleep b/.bleep index a37da60bd..4508557a6 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -b1c09703606d32b02f24d2e77d82936ba95e8064 \ No newline at end of file +ff2f3fb1c3c2b52d92fd5ff3b76b6f4d25488fb2 \ No newline at end of file diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index fa2cea21f..9c29aafe3 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -175,7 +175,8 @@ impl Server { /// /// Command line options can either be passed by parsing the command line arguments via /// `Opt::from_args()`, or be generated by other means. - pub fn new(opt: Option) -> Result { + pub fn new(opt: impl Into>) -> Result { + let opt = opt.into(); let (tx, rx) = watch::channel(false); let conf = if let Some(opt) = opt.as_ref() { From 8a226a0ad613f11e3719496be0d311861bd8709e Mon Sep 17 00:00:00 2001 From: Yuchen Wu Date: Tue, 12 Mar 2024 17:09:18 -0700 Subject: [PATCH 3/8] TinyUFO: add quick_cache for benchmarks Add more cache crates to compare with. Co-authored-by: Arthur Silva --- .bleep | 2 +- tinyufo/Cargo.toml | 1 + tinyufo/benches/bench_hit_ratio.rs | 66 ++++++++++-------- tinyufo/benches/bench_memory.rs | 28 ++++++++ tinyufo/benches/bench_perf.rs | 103 ++++++++++++++++++++++++++++- 5 files changed, 169 insertions(+), 31 deletions(-) diff --git a/.bleep b/.bleep index 4508557a6..a5b2b10f8 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -ff2f3fb1c3c2b52d92fd5ff3b76b6f4d25488fb2 \ No newline at end of file +541a53e91e59e55dc42ea0ec37880b11edd6bfe9 \ No newline at end of file diff --git a/tinyufo/Cargo.toml b/tinyufo/Cargo.toml index f21b201d7..4b3b2c913 100644 --- a/tinyufo/Cargo.toml +++ b/tinyufo/Cargo.toml @@ -28,6 +28,7 @@ lru = "0" zipf = "7" moka = { version = "0", features = ["sync"] } dhat = "0" +quick_cache = "0.4" [[bench]] name = "bench_perf" diff --git a/tinyufo/benches/bench_hit_ratio.rs b/tinyufo/benches/bench_hit_ratio.rs index 72dacd5af..23d0e58b3 100644 --- a/tinyufo/benches/bench_hit_ratio.rs +++ b/tinyufo/benches/bench_hit_ratio.rs @@ -23,6 +23,7 @@ fn bench_one(zip_exp: f64, cache_size_percent: f32) { let cache_size = (cache_size_percent * ITEMS as f32).round() as usize; let mut lru = lru::LruCache::::new(NonZeroUsize::new(cache_size).unwrap()); let moka = moka::sync::Cache::new(cache_size as u64); + let quick_cache = quick_cache::sync::Cache::new(cache_size); let tinyufo = tinyufo::TinyUfo::new(cache_size, cache_size); let mut rng = thread_rng(); @@ -30,6 +31,7 @@ fn bench_one(zip_exp: f64, cache_size_percent: f32) { let mut lru_hit = 0; let mut moka_hit = 0; + let mut quick_cache_hit = 0; let mut tinyufo_hit = 0; for _ in 0..ITERATIONS { @@ -47,6 +49,12 @@ fn bench_one(zip_exp: f64, cache_size_percent: f32) { moka.insert(key, ()); } + if quick_cache.get(&key).is_some() { + quick_cache_hit += 1; + } else { + quick_cache.insert(key, ()); + } + if tinyufo.get(&key).is_some() { tinyufo_hit += 1; } else { @@ -56,42 +64,46 @@ fn bench_one(zip_exp: f64, cache_size_percent: f32) { print!("{:.2}%\t\t", lru_hit as f32 / ITERATIONS as f32 * 100.0); print!("{:.2}%\t\t", moka_hit as f32 / ITERATIONS as f32 * 100.0); + print!( + "{:.2}%\t\t", + quick_cache_hit as f32 / ITERATIONS as f32 * 100.0 + ); println!("{:.2}%", tinyufo_hit as f32 / ITERATIONS as f32 * 100.0); } /* cargo bench --bench bench_hit_ratio -zipf & cache size lru moka TinyUFO -0.90, 0.005 19.23% 33.46% 33.35% -0.90, 0.01 26.21% 37.88% 40.10% -0.90, 0.05 45.59% 55.34% 57.81% -0.90, 0.1 55.73% 64.22% 66.34% -0.90, 0.25 71.18% 77.15% 78.53% -1.00, 0.005 31.09% 45.65% 45.13% -1.00, 0.01 39.17% 50.69% 52.23% -1.00, 0.05 58.73% 66.95% 68.81% -1.00, 0.1 67.57% 74.35% 75.93% -1.00, 0.25 79.91% 84.34% 85.27% -1.05, 0.005 37.68% 51.77% 51.26% -1.05, 0.01 46.11% 57.07% 58.41% -1.05, 0.05 65.04% 72.33% 73.91% -1.05, 0.1 73.11% 78.96% 80.22% -1.05, 0.25 83.77% 87.45% 88.16% -1.10, 0.005 44.48% 57.86% 57.25% -1.10, 0.01 52.97% 63.18% 64.23% -1.10, 0.05 70.94% 77.27% 78.57% -1.10, 0.1 78.11% 83.05% 84.06% -1.10, 0.25 87.08% 90.06% 90.62% -1.50, 0.005 85.25% 89.89% 89.68% -1.50, 0.01 89.88% 92.79% 92.94% -1.50, 0.05 96.04% 97.09% 97.25% -1.50, 0.1 97.52% 98.17% 98.26% -1.50, 0.25 98.81% 99.09% 99.10% +zipf & cache size lru moka QuickC TinyUFO +0.90, 0.005 19.24% 33.43% 32.33% 33.35% +0.90, 0.01 26.23% 37.86% 38.80% 40.06% +0.90, 0.05 45.58% 55.13% 55.71% 57.80% +0.90, 0.1 55.72% 64.15% 64.01% 66.36% +0.90, 0.25 71.16% 77.12% 75.92% 78.53% +1.00, 0.005 31.08% 45.68% 44.07% 45.15% +1.00, 0.01 39.17% 50.80% 50.90% 52.30% +1.00, 0.05 58.71% 66.92% 67.09% 68.79% +1.00, 0.1 67.59% 74.28% 74.00% 75.92% +1.00, 0.25 79.94% 84.35% 83.45% 85.28% +1.05, 0.005 37.66% 51.78% 50.13% 51.12% +1.05, 0.01 46.07% 57.13% 57.07% 58.41% +1.05, 0.05 65.06% 72.37% 72.41% 73.93% +1.05, 0.1 73.13% 78.97% 78.60% 80.24% +1.05, 0.25 83.74% 87.41% 86.68% 88.14% +1.10, 0.005 44.49% 57.84% 56.16% 57.28% +1.10, 0.01 52.97% 63.19% 62.99% 64.24% +1.10, 0.05 70.95% 77.24% 77.26% 78.55% +1.10, 0.1 78.05% 82.86% 82.66% 84.01% +1.10, 0.25 87.12% 90.10% 89.51% 90.66% +1.50, 0.005 85.27% 89.92% 89.08% 89.69% +1.50, 0.01 89.86% 92.77% 92.44% 92.94% +1.50, 0.05 96.01% 97.08% 96.99% 97.23% +1.50, 0.1 97.51% 98.15% 98.08% 98.24% +1.50, 0.25 98.81% 99.09% 99.03% 99.09% */ fn main() { - println!("zipf & cache size\t\tlru\t\tmoka\t\tTinyUFO",); + println!("zipf & cache size\t\tlru\t\tmoka\t\tQuickC\t\tTinyUFO",); for zif_exp in [0.9, 1.0, 1.05, 1.1, 1.5] { for cache_capacity in [0.005, 0.01, 0.05, 0.1, 0.25] { bench_one(zif_exp, cache_capacity); diff --git a/tinyufo/benches/bench_memory.rs b/tinyufo/benches/bench_memory.rs index 9d49210f2..271fe1225 100644 --- a/tinyufo/benches/bench_memory.rs +++ b/tinyufo/benches/bench_memory.rs @@ -52,6 +52,22 @@ fn bench_moka(zip_exp: f64, items: usize, cache_size_percent: f32) { } } +fn bench_quick_cache(zip_exp: f64, items: usize, cache_size_percent: f32) { + let cache_size = (cache_size_percent * items as f32).round() as usize; + let quick_cache = quick_cache::sync::Cache::new(cache_size); + + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(items, zip_exp).unwrap(); + + for _ in 0..ITERATIONS { + let key = zipf.sample(&mut rng) as u64; + + if quick_cache.get(&key).is_none() { + quick_cache.insert(key, ()); + } + } +} + fn bench_tinyufo(zip_exp: f64, items: usize, cache_size_percent: f32) { let cache_size = (cache_size_percent * items as f32).round() as usize; let tinyufo = tinyufo::TinyUfo::new(cache_size, (cache_size as f32 * 1.0) as usize); @@ -92,6 +108,8 @@ lru dhat: At t-gmax: 9,408 bytes in 106 blocks moka dhat: At t-gmax: 354,232 bytes in 1,581 blocks +QuickCache +dhat: At t-gmax: 11,840 bytes in 8 blocks TinyUFO dhat: At t-gmax: 37,337 bytes in 351 blocks TinyUFO compat @@ -102,6 +120,8 @@ lru dhat: At t-gmax: 128,512 bytes in 1,004 blocks moka dhat: At t-gmax: 535,320 bytes in 7,278 blocks +QuickCache +dhat: At t-gmax: 93,000 bytes in 66 blocks TinyUFO dhat: At t-gmax: 236,053 bytes in 2,182 blocks TinyUFO Compact @@ -112,6 +132,8 @@ lru dhat: At t-gmax: 1,075,648 bytes in 10,004 blocks moka dhat: At t-gmax: 2,489,088 bytes in 62,374 blocks +QuickCache +dhat: At t-gmax: 863,752 bytes in 66 blocks TinyUFO dhat: At t-gmax: 2,290,635 bytes in 20,467 blocks TinyUFO @@ -133,6 +155,12 @@ fn main() { println!("\nmoka"); } + { + let _profiler = dhat::Profiler::new_heap(); + bench_quick_cache(1.05, items, 0.1); + println!("\nQuickCache"); + } + { let _profiler = dhat::Profiler::new_heap(); bench_tinyufo(1.05, items, 0.1); diff --git a/tinyufo/benches/bench_perf.rs b/tinyufo/benches/bench_perf.rs index bee8a110c..9b740009e 100644 --- a/tinyufo/benches/bench_perf.rs +++ b/tinyufo/benches/bench_perf.rs @@ -14,7 +14,7 @@ use rand::prelude::*; use std::num::NonZeroUsize; -use std::sync::Mutex; +use std::sync::{Barrier, Mutex}; use std::thread; use std::time::Instant; @@ -31,6 +31,7 @@ Below is from Linux + Ryzen 5 7600 CPU lru read total 150.423567ms, 30ns avg per operation, 33239472 ops per second moka read total 462.133322ms, 92ns avg per operation, 10819389 ops per second +quick_cache read total 125.618216ms, 25ns avg per operation, 39803144 ops per second tinyufo read total 199.007359ms, 39ns avg per operation, 25124698 ops per second tinyufo compact read total 331.145859ms, 66ns avg per operation, 15099087 ops per second @@ -42,6 +43,10 @@ moka read total 2.742258211s, 548ns avg per operation, 1823314 ops per second ... total 14072430 ops per second +quick_cache read total 1.186566627s, 237ns avg per operation, 4213838 ops per second +... +total 33694776 ops per second + tinyufo read total 208.346855ms, 41ns avg per operation, 23998444 ops per second ... total 148691408 ops per second @@ -58,6 +63,10 @@ moka mixed read/write 2.368500882s, 473ns avg per operation, 2111040 ops per sec ... total 16557962 ops per second +quick_cache mixed read/write 838.072588ms, 167ns avg per operation, 5966070 ops per second 315051 misses +... +total 47698472 ops per second + tinyufo mixed read/write 456.134531ms, 91ns avg per operation, 10961678 ops per second, 294977 misses ... total 80865792 ops per second @@ -68,9 +77,11 @@ total 62600844 ops per second */ fn main() { + println!("Note: these performance numbers vary a lot across different CPUs and OSes."); // we don't bench eviction here so make the caches large enough to hold all let lru = Mutex::new(lru::LruCache::::unbounded()); let moka = moka::sync::Cache::new(ITEMS as u64 + 10); + let quick_cache = quick_cache::sync::Cache::new(ITEMS + 10); let tinyufo = tinyufo::TinyUfo::new(ITEMS + 10, 10); let tinyufo_compact = tinyufo::TinyUfo::new_compact(ITEMS + 10, 10); @@ -78,6 +89,7 @@ fn main() { for i in 0..ITEMS { lru.lock().unwrap().put(i as u64, ()); moka.insert(i as u64, ()); + quick_cache.insert(i as u64, ()); tinyufo.put(i as u64, (), 1); tinyufo_compact.put(i as u64, (), 1); } @@ -108,6 +120,17 @@ fn main() { (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 ); + let before = Instant::now(); + for _ in 0..ITERATIONS { + quick_cache.get(&(zipf.sample(&mut rng) as u64)); + } + let elapsed = before.elapsed(); + println!( + "quick_cache read total {elapsed:?}, {:?} avg per operation, {} ops per second", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + let before = Instant::now(); for _ in 0..ITERATIONS { tinyufo.get(&(zipf.sample(&mut rng) as u64)); @@ -131,13 +154,14 @@ fn main() { ); // concurrent - + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { s.spawn(|| { let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { lru.lock().unwrap().get(&(zipf.sample(&mut rng) as u64)); @@ -157,12 +181,14 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { s.spawn(|| { let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { moka.get(&(zipf.sample(&mut rng) as u64)); @@ -182,12 +208,41 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { s.spawn(|| { let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); + let before = Instant::now(); + for _ in 0..ITERATIONS { + quick_cache.get(&(zipf.sample(&mut rng) as u64)); + } + let elapsed = before.elapsed(); + println!( + "quick_cache read total {elapsed:?}, {:?} avg per operation, {} ops per second", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + }); + } + }); + let elapsed = before.elapsed(); + println!( + "total {} ops per second", + (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 + ); + + let wg = Barrier::new(THREADS); + let before = Instant::now(); + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { tinyufo.get(&(zipf.sample(&mut rng) as u64)); @@ -207,12 +262,14 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { s.spawn(|| { let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { tinyufo_compact.get(&(zipf.sample(&mut rng) as u64)); @@ -240,6 +297,7 @@ fn main() { let lru = Mutex::new(lru::LruCache::::new( NonZeroUsize::new(CACHE_SIZE).unwrap(), )); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { @@ -247,6 +305,7 @@ fn main() { let mut miss_count = 0; let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { let key = zipf.sample(&mut rng) as u64; @@ -272,7 +331,7 @@ fn main() { ); let moka = moka::sync::Cache::new(CACHE_SIZE as u64); - + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { @@ -280,6 +339,7 @@ fn main() { let mut miss_count = 0; let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { let key = zipf.sample(&mut rng) as u64; @@ -303,7 +363,41 @@ fn main() { (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 ); + let quick_cache = quick_cache::sync::Cache::new(CACHE_SIZE); + let wg = Barrier::new(THREADS); + let before = Instant::now(); + thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + let mut miss_count = 0; + let mut rng = thread_rng(); + let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); + let before = Instant::now(); + for _ in 0..ITERATIONS { + let key = zipf.sample(&mut rng) as u64; + if quick_cache.get(&key).is_none() { + quick_cache.insert(key, ()); + miss_count += 1; + } + } + let elapsed = before.elapsed(); + println!( + "quick_cache mixed read/write {elapsed:?}, {:?} avg per operation, {} ops per second {miss_count} misses", + elapsed / ITERATIONS as u32, + (ITERATIONS as f32 / elapsed.as_secs_f32()) as u32 + ); + }); + } + }); + let elapsed = before.elapsed(); + println!( + "total {} ops per second", + (ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32 + ); + let tinyufo = tinyufo::TinyUfo::new(CACHE_SIZE, CACHE_SIZE); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { @@ -311,6 +405,7 @@ fn main() { let mut miss_count = 0; let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { let key = zipf.sample(&mut rng) as u64; @@ -336,6 +431,7 @@ fn main() { ); let tinyufo_compact = tinyufo::TinyUfo::new(CACHE_SIZE, CACHE_SIZE); + let wg = Barrier::new(THREADS); let before = Instant::now(); thread::scope(|s| { for _ in 0..THREADS { @@ -343,6 +439,7 @@ fn main() { let mut miss_count = 0; let mut rng = thread_rng(); let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap(); + wg.wait(); let before = Instant::now(); for _ in 0..ITERATIONS { let key = zipf.sample(&mut rng) as u64; From d23d3a345fefc92bca15bc6e65bee9527d8d18c0 Mon Sep 17 00:00:00 2001 From: Yuchen Wu Date: Fri, 15 Mar 2024 11:05:43 -0700 Subject: [PATCH 4/8] Correctly init body writer for upgraded requests Before this change, the code tried to get the header from the stored header which is only set after this function is called. --- .bleep | 2 +- pingora-core/src/protocols/http/v1/client.rs | 22 +++++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/.bleep b/.bleep index a5b2b10f8..39a4637f9 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -541a53e91e59e55dc42ea0ec37880b11edd6bfe9 \ No newline at end of file +75bf189b8e2ce2ab5acead15f8db45485085e577 \ No newline at end of file diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 7881e5978..c8e030087 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -532,7 +532,7 @@ impl HttpSession { } fn init_req_body_writer(&mut self, header: &RequestHeader) { - if self.is_upgrade_req() { + if is_upgrade_req(header) { self.body_writer.init_http10(); } else { self.init_body_writer_comm(&header.headers) @@ -892,6 +892,26 @@ mod tests_stream { } } + #[tokio::test] + async fn init_body_for_upgraded_req() { + use crate::protocols::http::v1::body::BodyMode; + + let wire = + b"GET / HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: WS\r\nContent-Length: 0\r\n\r\n"; + let mock_io = Builder::new().write(wire).build(); + let mut http_stream = HttpSession::new(Box::new(mock_io)); + let mut new_request = RequestHeader::build("GET", b"/", None).unwrap(); + new_request.insert_header("Connection", "Upgrade").unwrap(); + new_request.insert_header("Upgrade", "WS").unwrap(); + // CL is ignored when Upgrade presents + new_request.insert_header("Content-Length", "0").unwrap(); + let _ = http_stream + .write_request_header(Box::new(new_request)) + .await + .unwrap(); + assert_eq!(http_stream.body_writer.body_mode, BodyMode::HTTP1_0(0)); + } + #[tokio::test] async fn read_switching_protocol() { init_log(); From 86e33226682c4fd20adb3cdf5414739f42c25427 Mon Sep 17 00:00:00 2001 From: ewang Date: Fri, 15 Mar 2024 11:58:34 -0700 Subject: [PATCH 5/8] Require nix ~0.24.3 and chrono ~0.4.31 in Cargo.toml These minor versions are required for the APIs used in pingora-core. --- .bleep | 2 +- pingora-core/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bleep b/.bleep index 39a4637f9..25ef42f75 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -75bf189b8e2ce2ab5acead15f8db45485085e577 \ No newline at end of file +62aab35f3ee85a4978de9075877b831e9f185ed2 \ No newline at end of file diff --git a/pingora-core/Cargo.toml b/pingora-core/Cargo.toml index 76d239cb2..a885e6bf2 100644 --- a/pingora-core/Cargo.toml +++ b/pingora-core/Cargo.toml @@ -35,13 +35,13 @@ http = { workspace = true } log = { workspace = true } h2 = { workspace = true } lru = { workspace = true } -nix = "0.24" +nix = "~0.24.3" structopt = "0.3" once_cell = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.8" libc = "0.2.70" -chrono = { version = "0.4", features = ["alloc"], default-features = false } +chrono = { version = "~0.4.31", features = ["alloc"], default-features = false } thread_local = "1.0" prometheus = "0.13" daemonize = "0.5.0" From 0cb582f20f9d43ad67b0f858695e00bcb29e25af Mon Sep 17 00:00:00 2001 From: Alexandre Antonio Juca Date: Wed, 13 Mar 2024 17:24:53 +0000 Subject: [PATCH 6/8] Fix typos and grammar issues Update comment Co-authored-by: Kevin Guthrie Includes-commit: 61a55ae3fa18e11858a33b74f5442ff230aefc4d Includes-commit: 7da3e13d842085cd7079be990cf41f86ffa780ed Replicated-from: https://github.com/cloudflare/pingora/pull/128 --- .bleep | 2 +- pingora-http/src/lib.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.bleep b/.bleep index 25ef42f75..20d34d826 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -62aab35f3ee85a4978de9075877b831e9f185ed2 \ No newline at end of file +cfeea5bcf669335b2ef1fa90b4572d1c52bc2c82 \ No newline at end of file diff --git a/pingora-http/src/lib.rs b/pingora-http/src/lib.rs index 24b648f7c..681bcfbc3 100644 --- a/pingora-http/src/lib.rs +++ b/pingora-http/src/lib.rs @@ -169,7 +169,7 @@ impl RequestHeader { /// Insert the header name and value to `self`. /// /// Different from [Self::append_header()], this method will replace all other existing headers - /// under the same name (case insensitive). + /// under the same name (case-insensitive). pub fn insert_header( &mut self, name: impl IntoCaseHeaderName, @@ -476,7 +476,7 @@ fn clone_resp_parts(me: &RespParts) -> RespParts { // This function returns an upper bound on the size of the header map used inside the http crate. // As of version 0.2, there is a limit of 1 << 15 (32,768) items inside the map. There is an -// assertion against this size inside the crate so we want to avoid panicking by not exceeding this +// assertion against this size inside the crate, so we want to avoid panicking by not exceeding this // upper bound. fn http_header_map_upper_bound(size_hint: Option) -> usize { // Even though the crate has 1 << 15 as the max size, calls to `with_capacity` invoke a @@ -484,13 +484,13 @@ fn http_header_map_upper_bound(size_hint: Option) -> usize { // // See https://github.com/hyperium/http/blob/34a9d6bdab027948d6dea3b36d994f9cbaf96f75/src/header/map.rs#L3220 // - // Therefore we set our max size to be even lower so we guarantee ourselves we won't hit that + // Therefore we set our max size to be even lower, so we guarantee ourselves we won't hit that // upper bound in the crate. Any way you cut it, 4,096 headers is insane. const PINGORA_MAX_HEADER_COUNT: usize = 4096; const INIT_HEADER_SIZE: usize = 8; - // We select the size hint or the max size here such that we pick a value substantially lower - // 1 << 15 with room to grow the header map. + // We select the size hint or the max size here, ensuring that we pick a value substantially lower + // than 1 << 15 with room to grow the header map. std::cmp::min( size_hint.unwrap_or(INIT_HEADER_SIZE), PINGORA_MAX_HEADER_COUNT, @@ -509,7 +509,7 @@ fn append_header_value( .as_slice() .try_into() .or_err(InvalidHTTPHeader, "invalid header name")?; - // storage the original case in the map + // store the original case in the map if let Some(name_map) = name_map { name_map.append(header_name.clone(), case_header_name); } @@ -530,7 +530,7 @@ fn insert_header_value( .try_into() .or_err(InvalidHTTPHeader, "invalid header name")?; if let Some(name_map) = name_map { - // storage the original case in the map + // store the original case in the map name_map.insert(header_name.clone(), case_header_name); } value_map.insert(header_name, value); @@ -562,7 +562,7 @@ fn header_to_h1_wire(key_map: Option<&CaseMap>, value_map: &HMap, buf: &mut impl let iter = key_map.iter().zip(value_map.iter()); for ((header, case_header), (header2, val)) in iter { if header != header2 { - // in case the header iter order changes in further version of HMap + // in case the header iteration order changes in future versions of HMap panic!("header iter mismatch {}, {}", header, header2) } buf.put_slice(case_header.as_slice()); From 4f98089a9e87238f483e975618ea58dd59830f1d Mon Sep 17 00:00:00 2001 From: gengteng Date: Thu, 29 Feb 2024 03:47:23 +0000 Subject: [PATCH 7/8] feat: implement Keep-Alive header parsing in Session Replicated-from: https://github.com/cloudflare/pingora/pull/24 Includes-commit: 357c1b95d9bea55327eec22c7aa1f02ebca59075 --- .bleep | 2 +- pingora-core/src/protocols/http/v1/client.rs | 86 +++++++++++++++++++- 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/.bleep b/.bleep index 20d34d826..67186fef0 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -cfeea5bcf669335b2ef1fa90b4572d1c52bc2c82 \ No newline at end of file +51069b16b63684ec579bbe0ef3ab1a0ee07cf51d \ No newline at end of file diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index c8e030087..5d1a55a4d 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -293,7 +293,7 @@ impl HttpSession { InvalidHTTPHeader, format!("buf: {:?}", String::from_utf8_lossy(&buf)), e, - ) + ); } } } @@ -430,8 +430,29 @@ impl HttpSession { // `Keep-Alive: timeout=5, max=1000` => 5, 1000 fn get_keepalive_values(&self) -> (Option, Option) { - // TODO: implement this parsing - (None, None) + if let Some(keep_alive_header) = self.get_header("Keep-Alive") { + let Ok(header_value) = str::from_utf8(keep_alive_header.as_bytes()) else { + return (None, None); + }; + + let mut timeout = None; + let mut max = None; + + for param in header_value.split(',') { + let parts: Vec<&str> = param.split('=').map(|s| s.trim()).collect(); + match &parts.as_slice() { + ["timeout", timeout_value] => { + timeout = timeout_value.trim().parse::().ok() + } + ["max", max_value] => max = max_value.trim().parse::().ok(), + _ => {} + } + } + + (timeout, max) + } else { + (None, None) + } } /// Close the connection abruptly. This allows to signal the server that the connection is closed @@ -1081,6 +1102,65 @@ mod tests_stream { .keepalive_timeout, KeepaliveStatus::Off ); + + async fn build_resp_with_keepalive_values(keep_alive: &str) -> HttpSession { + let input = format!("HTTP/1.1 200 OK\r\nKeep-Alive: {keep_alive}\r\n\r\n"); + let mock_io = Builder::new().read(input.as_bytes()).build(); + let mut http_stream = HttpSession::new(Box::new(mock_io)); + let res = http_stream.read_response().await; + assert_eq!(input.len(), res.unwrap()); + http_stream.respect_keepalive(); + http_stream + } + + assert_eq!( + build_resp_with_keepalive_values("timeout=5, max=1000") + .await + .get_keepalive_values(), + (Some(5), Some(1000)) + ); + + assert_eq!( + build_resp_with_keepalive_values("max=1000, timeout=5") + .await + .get_keepalive_values(), + (Some(5), Some(1000)) + ); + + assert_eq!( + build_resp_with_keepalive_values(" timeout = 5, max = 1000 ") + .await + .get_keepalive_values(), + (Some(5), Some(1000)) + ); + + assert_eq!( + build_resp_with_keepalive_values("timeout=5") + .await + .get_keepalive_values(), + (Some(5), None) + ); + + assert_eq!( + build_resp_with_keepalive_values("max=1000") + .await + .get_keepalive_values(), + (None, Some(1000)) + ); + + assert_eq!( + build_resp_with_keepalive_values("a=b") + .await + .get_keepalive_values(), + (None, None) + ); + + assert_eq!( + build_resp_with_keepalive_values("") + .await + .get_keepalive_values(), + (None, None) + ); } /* Note: body tests are covered in server.rs */ From 1813de78cf64631b23ad238be6692026989de905 Mon Sep 17 00:00:00 2001 From: Kevin Guthrie Date: Fri, 15 Mar 2024 16:05:01 -0400 Subject: [PATCH 8/8] Refactoring external change to address code-review comments --- .bleep | 2 +- pingora-core/src/protocols/http/v1/client.rs | 41 +++++++++++--------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/.bleep b/.bleep index 67186fef0..a7047957f 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -51069b16b63684ec579bbe0ef3ab1a0ee07cf51d \ No newline at end of file +7d3baa7e49e9b5c7d76775971c9f57f604209f38 \ No newline at end of file diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 5d1a55a4d..46045447a 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -428,31 +428,34 @@ impl HttpSession { is_buf_keepalive(self.get_header(header::CONNECTION).map(|v| v.as_bytes())) } - // `Keep-Alive: timeout=5, max=1000` => 5, 1000 + /// `Keep-Alive: timeout=5, max=1000` => 5, 1000 + /// This is defined in the below spec, this not part of any RFC, so + /// it's behavior is different on different platforms. + /// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive fn get_keepalive_values(&self) -> (Option, Option) { - if let Some(keep_alive_header) = self.get_header("Keep-Alive") { - let Ok(header_value) = str::from_utf8(keep_alive_header.as_bytes()) else { - return (None, None); - }; + let Some(keep_alive_header) = self.get_header("Keep-Alive") else { + return (None, None); + }; - let mut timeout = None; - let mut max = None; + let Ok(header_value) = str::from_utf8(keep_alive_header.as_bytes()) else { + return (None, None); + }; - for param in header_value.split(',') { - let parts: Vec<&str> = param.split('=').map(|s| s.trim()).collect(); - match &parts.as_slice() { - ["timeout", timeout_value] => { - timeout = timeout_value.trim().parse::().ok() - } - ["max", max_value] => max = max_value.trim().parse::().ok(), - _ => {} + let mut timeout = None; + let mut max = None; + + for param in header_value.split(',') { + let mut parts = param.splitn(2, '=').map(|s| s.trim()); + match (parts.next(), parts.next()) { + (Some("timeout"), Some(timeout_value)) => { + timeout = timeout_value.trim().parse().ok() } + (Some("max"), Some(max_value)) => max = max_value.trim().parse().ok(), + _ => {} } - - (timeout, max) - } else { - (None, None) } + + (timeout, max) } /// Close the connection abruptly. This allows to signal the server that the connection is closed