Skip to content

Commit

Permalink
Cache the recent announced file for random sync with priority
Browse files Browse the repository at this point in the history
  • Loading branch information
boqiu committed Dec 18, 2024
1 parent 6293006 commit 934720d
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 21 deletions.
4 changes: 3 additions & 1 deletion node/sync/src/auto_sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ impl AutoSyncManager {
// use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors
Arc::new(SyncStore::new_with_name(
store.clone(),
config.ready_txs_cache_cap,
"pendingv2",
"readyv2",
))
} else {
Arc::new(SyncStore::new(store.clone()))
Arc::new(SyncStore::new(store.clone(), 0))
};
let catched_up = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -98,6 +99,7 @@ impl AutoSyncManager {
if config.neighbors_only {
let historical_sync_store = Arc::new(SyncStore::new_with_name(
store.clone(),
0,
"pendingv2_historical",
"readyv2_historical",
));
Expand Down
41 changes: 21 additions & 20 deletions node/sync/src/auto_sync/sync_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::tx_store::TxStore;
use super::tx_store::{CachedTxStore, TxStore};
use anyhow::Result;
use std::sync::Arc;
use storage::log_store::{
Expand Down Expand Up @@ -33,23 +33,24 @@ pub struct SyncStore {

/// Ready transactions to sync with high priority since announcement
/// already received from other peers.
ready_txs: TxStore,
ready_txs: CachedTxStore,
}

impl SyncStore {
pub fn new(store: Store) -> Self {
Self {
store: Arc::new(RwLock::new(store)),
pending_txs: TxStore::new("pending"),
ready_txs: TxStore::new("ready"),
}
pub fn new(store: Store, ready_txs_cache_cap: usize) -> Self {
Self::new_with_name(store, ready_txs_cache_cap, "pending", "ready")
}

pub fn new_with_name(store: Store, pending: &'static str, ready: &'static str) -> Self {
pub fn new_with_name(
store: Store,
ready_txs_cache_cap: usize,
pending: &'static str,
ready: &'static str,
) -> Self {
Self {
store: Arc::new(RwLock::new(store)),
pending_txs: TxStore::new(pending),
ready_txs: TxStore::new(ready),
ready_txs: CachedTxStore::new(ready, ready_txs_cache_cap),
}
}

Expand Down Expand Up @@ -112,7 +113,7 @@ impl SyncStore {

match queue {
Queue::Ready => {
if !self.ready_txs.add(store, Some(&mut tx), tx_seq)? {
if !self.ready_txs.add(store, Some(&mut tx), tx_seq).await? {
return Ok(InsertResult::AlreadyExists);
}

Expand All @@ -130,7 +131,7 @@ impl SyncStore {
return Ok(InsertResult::AlreadyExists);
}

let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq).await?;
store.exec_configs(tx, DATA_DB_KEY)?;

if removed {
Expand All @@ -152,7 +153,7 @@ impl SyncStore {
return Ok(false);
}

let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq).await?;

store.exec_configs(tx, DATA_DB_KEY)?;

Expand All @@ -164,7 +165,7 @@ impl SyncStore {
let store = async_store.get_store();

// try to find a tx in ready queue with high priority
if let Some(val) = self.ready_txs.random(store)? {
if let Some(val) = self.ready_txs.random(store).await? {
return Ok(Some(val));
}

Expand All @@ -177,7 +178,7 @@ impl SyncStore {
let store = async_store.get_store();

// removed in ready queue
if self.ready_txs.remove(store, None, tx_seq)? {
if self.ready_txs.remove(store, None, tx_seq).await? {
return Ok(Some(Queue::Ready));
}

Expand All @@ -199,7 +200,7 @@ mod tests {
#[tokio::test]
async fn test_tx_seq_range() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
let store = SyncStore::new(runtime.store.clone(), 0);

// check values by default
assert_eq!(store.get_tx_seq_range().await.unwrap(), (None, None));
Expand All @@ -215,7 +216,7 @@ mod tests {
#[tokio::test]
async fn test_insert() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
let store = SyncStore::new(runtime.store.clone(), 0);

assert_eq!(store.contains(1).await.unwrap(), None);
assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded);
Expand All @@ -234,7 +235,7 @@ mod tests {
#[tokio::test]
async fn test_upgrade() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
let store = SyncStore::new(runtime.store.clone(), 0);

// cannot upgrade by default
assert!(!store.upgrade(3).await.unwrap());
Expand All @@ -253,7 +254,7 @@ mod tests {
#[tokio::test]
async fn test_random() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
let store = SyncStore::new(runtime.store.clone(), 0);

// no tx by default
assert_eq!(store.random().await.unwrap(), None);
Expand All @@ -273,7 +274,7 @@ mod tests {
#[tokio::test]
async fn test_remove() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
let store = SyncStore::new(runtime.store.clone(), 0);

// cannot remove by default
assert_eq!(store.remove(1).await.unwrap(), None);
Expand Down
91 changes: 91 additions & 0 deletions node/sync/src/auto_sync/tx_store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::collections::HashSet;

use anyhow::Result;
use rand::seq::IteratorRandom;
use rand::Rng;
use storage::log_store::config::{ConfigTx, ConfigurableExt};
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::log_store::Store;
use tokio::sync::RwLock;

/// TxStore is used to store pending transactions that to be synchronized in advance.
///
Expand Down Expand Up @@ -138,6 +142,93 @@ impl TxStore {
}
}

/// Cache the recent inserted tx in memory for random pick with priority.
pub struct CachedTxStore {
tx_store: TxStore,
cache_cap: usize,
cache: RwLock<HashSet<u64>>,
}

impl CachedTxStore {
pub fn new(name: &'static str, cache_cap: usize) -> Self {
Self {
tx_store: TxStore::new(name),
cache_cap,
cache: Default::default(),
}
}

pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> {
self.tx_store.has(store, tx_seq)
}

pub fn count(&self, store: &dyn Store) -> Result<usize> {
self.tx_store.count(store)
}

pub async fn add(
&self,
store: &dyn Store,
db_tx: Option<&mut ConfigTx>,
tx_seq: u64,
) -> Result<bool> {
if self.cache_cap == 0 {
return self.tx_store.add(store, db_tx, tx_seq);
}

let mut cache = self.cache.write().await;

let added = self.tx_store.add(store, db_tx, tx_seq)?;

if added {
cache.insert(tx_seq);

if cache.len() > self.cache_cap {
if let Some(popped) = cache.iter().choose(&mut rand::thread_rng()).cloned() {
cache.remove(&popped);
}
}
}

Ok(added)
}

pub async fn random(&self, store: &dyn Store) -> Result<Option<u64>> {
if self.cache_cap == 0 {
return self.tx_store.random(store);
}

let cache = self.cache.read().await;

if let Some(v) = cache.iter().choose(&mut rand::thread_rng()).cloned() {
return Ok(Some(v));
}

self.tx_store.random(store)
}

pub async fn remove(
&self,
store: &dyn Store,
db_tx: Option<&mut ConfigTx>,
tx_seq: u64,
) -> Result<bool> {
if self.cache_cap == 0 {
return self.tx_store.remove(store, db_tx, tx_seq);
}

let mut cache: tokio::sync::RwLockWriteGuard<'_, HashSet<u64>> = self.cache.write().await;

let removed = self.tx_store.remove(store, db_tx, tx_seq)?;

if removed {
cache.remove(&tx_seq);
}

Ok(removed)
}
}

#[cfg(test)]
mod tests {
use crate::test_util::tests::TestStoreRuntime;
Expand Down
2 changes: 2 additions & 0 deletions node/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct Config {
pub sequential_find_peer_timeout: Duration,
#[serde(deserialize_with = "deserialize_duration")]
pub random_find_peer_timeout: Duration,
pub ready_txs_cache_cap: usize,
}

impl Default for Config {
Expand Down Expand Up @@ -94,6 +95,7 @@ impl Default for Config {
max_random_workers: 8,
sequential_find_peer_timeout: Duration::from_secs(5),
random_find_peer_timeout: Duration::from_secs(5),
ready_txs_cache_cap: 1_000_000,
}
}
}
Expand Down

0 comments on commit 934720d

Please sign in to comment.