Skip to content

Commit

Permalink
uses lazy LRU cache for ClusterNodesCache in Turbine (#2507)
Browse files Browse the repository at this point in the history
Current LRU cache implementation used in the code requires an exclusive
lock even on the read path due to &mut self receiver:
https://docs.rs/lru/latest/lru/struct.LruCache.html#method.get
Most reads do not update the cache so this write-lock can unnecessary
exacerbate lock contention.

The commit switches to lazy LRU cache which allows shared lock on the
read path and, additionally, it is generally faster.
  • Loading branch information
behzadnouri authored Aug 8, 2024
1 parent 39af6c0 commit 5aaa334
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes = { workspace = true }
crossbeam-channel = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lazy-lru = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
quinn = { workspace = true }
Expand Down
28 changes: 16 additions & 12 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
itertools::Itertools,
lru::LruCache,
lazy_lru::LruCache,
rand::{seq::SliceRandom, Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_gossip::{
Expand Down Expand Up @@ -31,7 +31,7 @@ use {
iter::repeat_with,
marker::PhantomData,
net::{IpAddr, SocketAddr},
sync::{Arc, Mutex, RwLock},
sync::{Arc, RwLock},
time::{Duration, Instant},
},
thiserror::Error,
Expand Down Expand Up @@ -78,7 +78,7 @@ type CacheEntry<T> = Option<(/*as of:*/ Instant, Arc<ClusterNodes<T>>)>;
pub struct ClusterNodesCache<T> {
// Cache entries are wrapped in Arc<RwLock<...>>, so that, when needed, only
// one thread does the computations to update the entry for the epoch.
cache: Mutex<LruCache<Epoch, Arc<RwLock<CacheEntry<T>>>>>,
cache: RwLock<LruCache<Epoch, Arc<RwLock<CacheEntry<T>>>>>,
ttl: Duration, // Time to live.
}

Expand Down Expand Up @@ -434,23 +434,27 @@ impl<T> ClusterNodesCache<T> {
ttl: Duration,
) -> Self {
Self {
cache: Mutex::new(LruCache::new(cap)),
cache: RwLock::new(LruCache::new(cap)),
ttl,
}
}
}

impl<T: 'static> ClusterNodesCache<T> {
fn get_cache_entry(&self, epoch: Epoch) -> Arc<RwLock<CacheEntry<T>>> {
let mut cache = self.cache.lock().unwrap();
match cache.get(&epoch) {
Some(entry) => Arc::clone(entry),
None => {
let entry = Arc::default();
cache.put(epoch, Arc::clone(&entry));
entry
}
if let Some(entry) = self.cache.read().unwrap().get(&epoch) {
return Arc::clone(entry);
}
let mut cache = self.cache.write().unwrap();
// Have to recheck again here because the cache might have been updated
// by another thread in between the time this thread releases the read
// lock and obtains the write lock.
if let Some(entry) = cache.get(&epoch) {
return Arc::clone(entry);
}
let entry = Arc::default();
cache.put(epoch, Arc::clone(&entry));
entry
}

pub(crate) fn get(
Expand Down

0 comments on commit 5aaa334

Please sign in to comment.