From 934720d01982ce75ecf0538ed739563a3c103119 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 18 Dec 2024 17:21:17 +0800 Subject: [PATCH] Cache the recent announced file for random sync with priority --- node/sync/src/auto_sync/manager.rs | 4 +- node/sync/src/auto_sync/sync_store.rs | 41 ++++++------ node/sync/src/auto_sync/tx_store.rs | 91 +++++++++++++++++++++++++++ node/sync/src/lib.rs | 2 + 4 files changed, 117 insertions(+), 21 deletions(-) diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index fc19a69d..7b2b9d2c 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -46,11 +46,12 @@ impl AutoSyncManager { // use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors Arc::new(SyncStore::new_with_name( store.clone(), + config.ready_txs_cache_cap, "pendingv2", "readyv2", )) } else { - Arc::new(SyncStore::new(store.clone())) + Arc::new(SyncStore::new(store.clone(), 0)) }; let catched_up = Arc::new(AtomicBool::new(false)); @@ -98,6 +99,7 @@ impl AutoSyncManager { if config.neighbors_only { let historical_sync_store = Arc::new(SyncStore::new_with_name( store.clone(), + 0, "pendingv2_historical", "readyv2_historical", )); diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index 0dcb5d56..7e3e9a16 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -1,4 +1,4 @@ -use super::tx_store::TxStore; +use super::tx_store::{CachedTxStore, TxStore}; use anyhow::Result; use std::sync::Arc; use storage::log_store::{ @@ -33,23 +33,24 @@ pub struct SyncStore { /// Ready transactions to sync with high priority since announcement /// already received from other peers. - ready_txs: TxStore, + ready_txs: CachedTxStore, } impl SyncStore { - pub fn new(store: Store) -> Self { - Self { - store: Arc::new(RwLock::new(store)), - pending_txs: TxStore::new("pending"), - ready_txs: TxStore::new("ready"), - } + pub fn new(store: Store, ready_txs_cache_cap: usize) -> Self { + Self::new_with_name(store, ready_txs_cache_cap, "pending", "ready") } - pub fn new_with_name(store: Store, pending: &'static str, ready: &'static str) -> Self { + pub fn new_with_name( + store: Store, + ready_txs_cache_cap: usize, + pending: &'static str, + ready: &'static str, + ) -> Self { Self { store: Arc::new(RwLock::new(store)), pending_txs: TxStore::new(pending), - ready_txs: TxStore::new(ready), + ready_txs: CachedTxStore::new(ready, ready_txs_cache_cap), } } @@ -112,7 +113,7 @@ impl SyncStore { match queue { Queue::Ready => { - if !self.ready_txs.add(store, Some(&mut tx), tx_seq)? { + if !self.ready_txs.add(store, Some(&mut tx), tx_seq).await? { return Ok(InsertResult::AlreadyExists); } @@ -130,7 +131,7 @@ impl SyncStore { return Ok(InsertResult::AlreadyExists); } - let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?; + let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq).await?; store.exec_configs(tx, DATA_DB_KEY)?; if removed { @@ -152,7 +153,7 @@ impl SyncStore { return Ok(false); } - let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?; + let added = self.ready_txs.add(store, Some(&mut tx), tx_seq).await?; store.exec_configs(tx, DATA_DB_KEY)?; @@ -164,7 +165,7 @@ impl SyncStore { let store = async_store.get_store(); // try to find a tx in ready queue with high priority - if let Some(val) = self.ready_txs.random(store)? { + if let Some(val) = self.ready_txs.random(store).await? { return Ok(Some(val)); } @@ -177,7 +178,7 @@ impl SyncStore { let store = async_store.get_store(); // removed in ready queue - if self.ready_txs.remove(store, None, tx_seq)? { + if self.ready_txs.remove(store, None, tx_seq).await? { return Ok(Some(Queue::Ready)); } @@ -199,7 +200,7 @@ mod tests { #[tokio::test] async fn test_tx_seq_range() { let runtime = TestStoreRuntime::default(); - let store = SyncStore::new(runtime.store.clone()); + let store = SyncStore::new(runtime.store.clone(), 0); // check values by default assert_eq!(store.get_tx_seq_range().await.unwrap(), (None, None)); @@ -215,7 +216,7 @@ mod tests { #[tokio::test] async fn test_insert() { let runtime = TestStoreRuntime::default(); - let store = SyncStore::new(runtime.store.clone()); + let store = SyncStore::new(runtime.store.clone(), 0); assert_eq!(store.contains(1).await.unwrap(), None); assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded); @@ -234,7 +235,7 @@ mod tests { #[tokio::test] async fn test_upgrade() { let runtime = TestStoreRuntime::default(); - let store = SyncStore::new(runtime.store.clone()); + let store = SyncStore::new(runtime.store.clone(), 0); // cannot upgrade by default assert!(!store.upgrade(3).await.unwrap()); @@ -253,7 +254,7 @@ mod tests { #[tokio::test] async fn test_random() { let runtime = TestStoreRuntime::default(); - let store = SyncStore::new(runtime.store.clone()); + let store = SyncStore::new(runtime.store.clone(), 0); // no tx by default assert_eq!(store.random().await.unwrap(), None); @@ -273,7 +274,7 @@ mod tests { #[tokio::test] async fn test_remove() { let runtime = TestStoreRuntime::default(); - let store = SyncStore::new(runtime.store.clone()); + let store = SyncStore::new(runtime.store.clone(), 0); // cannot remove by default assert_eq!(store.remove(1).await.unwrap(), None); diff --git a/node/sync/src/auto_sync/tx_store.rs b/node/sync/src/auto_sync/tx_store.rs index 9c2ac279..fd2e3b1b 100644 --- a/node/sync/src/auto_sync/tx_store.rs +++ b/node/sync/src/auto_sync/tx_store.rs @@ -1,8 +1,12 @@ +use std::collections::HashSet; + use anyhow::Result; +use rand::seq::IteratorRandom; use rand::Rng; use storage::log_store::config::{ConfigTx, ConfigurableExt}; use storage::log_store::log_manager::DATA_DB_KEY; use storage::log_store::Store; +use tokio::sync::RwLock; /// TxStore is used to store pending transactions that to be synchronized in advance. /// @@ -138,6 +142,93 @@ impl TxStore { } } +/// Cache the recent inserted tx in memory for random pick with priority. +pub struct CachedTxStore { + tx_store: TxStore, + cache_cap: usize, + cache: RwLock>, +} + +impl CachedTxStore { + pub fn new(name: &'static str, cache_cap: usize) -> Self { + Self { + tx_store: TxStore::new(name), + cache_cap, + cache: Default::default(), + } + } + + pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result { + self.tx_store.has(store, tx_seq) + } + + pub fn count(&self, store: &dyn Store) -> Result { + self.tx_store.count(store) + } + + pub async fn add( + &self, + store: &dyn Store, + db_tx: Option<&mut ConfigTx>, + tx_seq: u64, + ) -> Result { + if self.cache_cap == 0 { + return self.tx_store.add(store, db_tx, tx_seq); + } + + let mut cache = self.cache.write().await; + + let added = self.tx_store.add(store, db_tx, tx_seq)?; + + if added { + cache.insert(tx_seq); + + if cache.len() > self.cache_cap { + if let Some(popped) = cache.iter().choose(&mut rand::thread_rng()).cloned() { + cache.remove(&popped); + } + } + } + + Ok(added) + } + + pub async fn random(&self, store: &dyn Store) -> Result> { + if self.cache_cap == 0 { + return self.tx_store.random(store); + } + + let cache = self.cache.read().await; + + if let Some(v) = cache.iter().choose(&mut rand::thread_rng()).cloned() { + return Ok(Some(v)); + } + + self.tx_store.random(store) + } + + pub async fn remove( + &self, + store: &dyn Store, + db_tx: Option<&mut ConfigTx>, + tx_seq: u64, + ) -> Result { + if self.cache_cap == 0 { + return self.tx_store.remove(store, db_tx, tx_seq); + } + + let mut cache: tokio::sync::RwLockWriteGuard<'_, HashSet> = self.cache.write().await; + + let removed = self.tx_store.remove(store, db_tx, tx_seq)?; + + if removed { + cache.remove(&tx_seq); + } + + Ok(removed) + } +} + #[cfg(test)] mod tests { use crate::test_util::tests::TestStoreRuntime; diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index 6166b9eb..35f9ebe7 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -62,6 +62,7 @@ pub struct Config { pub sequential_find_peer_timeout: Duration, #[serde(deserialize_with = "deserialize_duration")] pub random_find_peer_timeout: Duration, + pub ready_txs_cache_cap: usize, } impl Default for Config { @@ -94,6 +95,7 @@ impl Default for Config { max_random_workers: 8, sequential_find_peer_timeout: Duration::from_secs(5), random_find_peer_timeout: Duration::from_secs(5), + ready_txs_cache_cap: 1_000_000, } } }