From fdc78738c8f721d14cf745a35af8155dc9ed4db7 Mon Sep 17 00:00:00 2001 From: Brice Dobry Date: Thu, 19 Dec 2024 15:52:54 -0500 Subject: [PATCH] feat: redesign nonce cache This redesign uses a proper LRU cache and is more careful about flushing the cache more efficiently. --- stacks-common/src/types/sqlite.rs | 9 +- stacks-common/src/util/lru_cache.rs | 256 ++++++++++++++++++ stacks-common/src/util/mod.rs | 1 + .../stacks/tests/block_construction.rs | 2 +- stackslib/src/config/mod.rs | 8 +- stackslib/src/core/mempool.rs | 202 ++------------ stackslib/src/core/mod.rs | 1 + stackslib/src/core/nonce_cache.rs | 253 +++++++++++++++++ .../stacks-node/src/nakamoto_node/miner.rs | 7 + 9 files changed, 548 insertions(+), 191 deletions(-) create mode 100644 stacks-common/src/util/lru_cache.rs create mode 100644 stackslib/src/core/nonce_cache.rs diff --git a/stacks-common/src/types/sqlite.rs b/stacks-common/src/types/sqlite.rs index 183ec61fbc..57010ea118 100644 --- a/stacks-common/src/types/sqlite.rs +++ b/stacks-common/src/types/sqlite.rs @@ -16,7 +16,7 @@ use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, ValueRef}; -use super::chainstate::VRFSeed; +use super::chainstate::{StacksAddress, VRFSeed}; use crate::deps_common::bitcoin::util::hash::Sha256dHash; use crate::types::chainstate::{ BlockHeaderHash, BurnchainHeaderHash, ConsensusHash, SortitionId, StacksBlockId, TrieHash, @@ -42,6 +42,13 @@ impl ToSql for Sha256dHash { } } +impl rusqlite::types::ToSql for StacksAddress { + fn to_sql(&self) -> rusqlite::Result { + let addr_str = self.to_string(); + Ok(addr_str.into()) + } +} + // Implement rusqlite traits for a bunch of structs that used to be defined // in the chainstate code impl_byte_array_rusqlite_only!(ConsensusHash); diff --git a/stacks-common/src/util/lru_cache.rs b/stacks-common/src/util/lru_cache.rs new file mode 100644 index 0000000000..97b55e69bc --- /dev/null +++ b/stacks-common/src/util/lru_cache.rs @@ -0,0 +1,256 @@ +// Copyright (C) 2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::collections::HashMap; + +/// Node in the doubly linked list +struct Node { + key: K, + value: V, + dirty: bool, + next: usize, + prev: usize, +} + +/// LRU cache for account nonces +pub struct LruCache { + capacity: usize, + /// Map from address to an offset in the linked list + cache: HashMap, + /// Doubly linked list of values in order of most recently used + order: Vec>, + /// Index of the head of the linked list -- the most recently used element + head: usize, + /// Index of the tail of the linked list -- the least recently used element + tail: usize, +} + +impl LruCache { + /// Create a new LRU cache with the given capacity + pub fn new(capacity: usize) -> Self { + LruCache { + capacity, + cache: HashMap::new(), + order: Vec::with_capacity(capacity), + head: capacity, + tail: capacity, + } + } + + /// Get the value for the given key + pub fn get(&mut self, key: &K) -> Option { + if let Some(node) = self.cache.get(key) { + // Move the node to the head of the LRU list + let node = *node; + + if node != self.head { + let prev = self.order[node].prev; + let next = self.order[node].next; + + if node == self.tail { + // If this is the tail, update the tail + self.tail = prev; + } else { + // Else, update the next node's prev pointer + self.order[next].prev = prev; + } + + self.order[prev].next = next; + self.order[node].prev = self.capacity; + self.order[node].next = self.head; + self.order[self.head].prev = node; + self.head = node; + } + + Some(self.order[node].value) + } else { + None + } + } + + /// Insert a key-value pair into the cache, marking it as dirty. + /// Returns `Some((K, V))` if a dirty value was evicted. + pub fn insert(&mut self, key: K, value: V) -> Option<(K, V)> { + self.insert_with_dirty(key, value, true) + } + + /// Insert a key-value pair into the cache, marking it as clean. + /// Returns `Some((K, V))` if a dirty value was evicted. + pub fn insert_clean(&mut self, key: K, value: V) -> Option<(K, V)> { + self.insert_with_dirty(key, value, false) + } + + /// Insert a key-value pair into the cache + /// Returns `Some((K, V))` if a dirty value was evicted. + pub fn insert_with_dirty(&mut self, key: K, value: V, dirty: bool) -> Option<(K, V)> { + let mut evicted = None; + if let Some(node) = self.cache.get(&key) { + // Update the value for the key + let node = *node; + self.order[node].value = value; + self.order[node].dirty = dirty; + + // Just call get to handle updating the LRU list + self.get(&key); + } else { + let index = if self.cache.len() == self.capacity { + // Take the place of the least recently used element. + // First, remove it from the tail of the LRU list + let index = self.tail; + let prev = self.order[index].prev; + self.order[prev].next = self.capacity; + self.tail = prev; + + // Remove it from the cache + self.cache.remove(&self.order[index].key); + + // If it is dirty, save the key-value pair to return + if self.order[index].dirty { + evicted = Some(( + std::mem::replace(&mut self.order[index].key, key.clone()), + self.order[index].value, + )); + } + + // Insert this new value into the cache + self.cache.insert(key, index); + + // Update the node with the new key-value pair, inserting it at + // the head of the LRU list + self.order[index].value = value; + self.order[index].dirty = dirty; + self.order[index].next = self.head; + self.order[index].prev = self.capacity; + + index + } else { + // Insert a new key-value pair + let node = Node { + key: key.clone(), + value, + dirty: dirty, + next: self.head, + prev: self.capacity, + }; + + let index = self.order.len(); + self.order.push(node); + self.cache.insert(key, index); + + index + }; + + // Put it at the head of the LRU list + if self.head != self.capacity { + self.order[self.head].prev = index; + } else { + self.tail = index; + } + + self.head = index; + } + evicted + } + + pub fn flush(&mut self, mut f: impl FnMut(&K, V) -> Result<(), E>) -> Result<(), E> { + let mut index = self.head; + while index != self.capacity { + println!("checking {index}, dirty? {}", self.order[index].dirty); + let next = self.order[index].next; + if self.order[index].dirty { + let value = self.order[index].value; + f(&self.order[index].key, value)?; + self.order[index].dirty = false; + } + index = next; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lru_cache() { + let mut cache = LruCache::new(2); + + cache.insert(1, 1); + cache.insert(2, 2); + assert_eq!(cache.get(&1), Some(1)); + cache.insert(3, 3); + assert_eq!(cache.get(&2), None); + cache.insert(4, 4); + assert_eq!(cache.get(&1), None); + assert_eq!(cache.get(&3), Some(3)); + assert_eq!(cache.get(&4), Some(4)); + } + + #[test] + fn test_lru_cache_update() { + let mut cache = LruCache::new(2); + + cache.insert(1, 1); + cache.insert(2, 2); + cache.insert(1, 10); + assert_eq!(cache.get(&1), Some(10)); + cache.insert(3, 3); + assert_eq!(cache.get(&2), None); + cache.insert(2, 4); + assert_eq!(cache.get(&2), Some(4)); + assert_eq!(cache.get(&3), Some(3)); + } + + #[test] + fn test_lru_cache_evicted() { + let mut cache = LruCache::new(2); + + assert!(cache.insert(1, 1).is_none()); + assert!(cache.insert(2, 2).is_none()); + let evicted = cache.insert(3, 3).expect("expected an eviction"); + assert_eq!(evicted, (1, 1)); + } + + #[test] + fn test_lru_cache_flush() { + let mut cache = LruCache::new(2); + + cache.insert(1, 1); + + let mut flushed = Vec::new(); + cache + .flush(|k, v| { + flushed.push((*k, v)); + Ok::<(), ()>(()) + }) + .unwrap(); + + assert_eq!(flushed, vec![(1, 1)]); + + cache.insert(1, 3); + cache.insert(2, 2); + + let mut flushed = Vec::new(); + cache + .flush(|k, v| { + flushed.push((*k, v)); + Ok::<(), ()>(()) + }) + .unwrap(); + + assert_eq!(flushed, vec![(2, 2), (1, 3)]); + } +} diff --git a/stacks-common/src/util/mod.rs b/stacks-common/src/util/mod.rs index 5f733eddad..f3fc7bf2bc 100644 --- a/stacks-common/src/util/mod.rs +++ b/stacks-common/src/util/mod.rs @@ -21,6 +21,7 @@ pub mod macros; pub mod chunked_encoding; pub mod db; pub mod hash; +pub mod lru_cache; pub mod pair; pub mod pipe; pub mod retry; diff --git a/stackslib/src/chainstate/stacks/tests/block_construction.rs b/stackslib/src/chainstate/stacks/tests/block_construction.rs index 90fc7f1705..45cad38a9a 100644 --- a/stackslib/src/chainstate/stacks/tests/block_construction.rs +++ b/stackslib/src/chainstate/stacks/tests/block_construction.rs @@ -4912,7 +4912,7 @@ fn mempool_walk_test_users_10_rounds_3_cache_size_2000_null_prob_100() { fn paramaterized_mempool_walk_test( num_users: usize, num_rounds: usize, - nonce_and_candidate_cache_size: u64, + nonce_and_candidate_cache_size: usize, consider_no_estimate_tx_prob: u8, timeout_ms: u128, ) { diff --git a/stackslib/src/config/mod.rs b/stackslib/src/config/mod.rs index 513c3ba3b6..57cd261811 100644 --- a/stackslib/src/config/mod.rs +++ b/stackslib/src/config/mod.rs @@ -2109,8 +2109,8 @@ pub struct MinerConfig { /// Wait for a downloader pass before mining. /// This can only be disabled in testing; it can't be changed in the config file. pub wait_for_block_download: bool, - pub nonce_cache_size: u64, - pub candidate_retry_cache_size: u64, + pub nonce_cache_size: usize, + pub candidate_retry_cache_size: usize, pub unprocessed_block_deadline_secs: u64, pub mining_key: Option, /// Amount of time while mining in nakamoto to wait in between mining interim blocks @@ -2563,8 +2563,8 @@ pub struct MinerConfigFile { pub probability_pick_no_estimate_tx: Option, pub block_reward_recipient: Option, pub segwit: Option, - pub nonce_cache_size: Option, - pub candidate_retry_cache_size: Option, + pub nonce_cache_size: Option, + pub candidate_retry_cache_size: Option, pub unprocessed_block_deadline_secs: Option, pub mining_key: Option, pub wait_on_interim_blocks_ms: Option, diff --git a/stackslib/src/core/mempool.rs b/stackslib/src/core/mempool.rs index 46ff54924b..ad2eafc74f 100644 --- a/stackslib/src/core/mempool.rs +++ b/stackslib/src/core/mempool.rs @@ -15,7 +15,7 @@ // along with this program. If not, see . use std::cmp::{self, Ordering}; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet, LinkedList, VecDeque}; use std::hash::Hasher; use std::io::{Read, Write}; use std::ops::{Deref, DerefMut}; @@ -55,6 +55,7 @@ use crate::chainstate::stacks::{ Error as ChainstateError, StacksBlock, StacksMicroblock, StacksTransaction, TransactionPayload, }; use crate::clarity_vm::clarity::ClarityConnection; +use crate::core::nonce_cache::NonceCache; use crate::core::{ ExecutionCost, StacksEpochId, FIRST_BURNCHAIN_CONSENSUS_HASH, FIRST_STACKS_BLOCK_HASH, }; @@ -529,10 +530,10 @@ pub struct MemPoolWalkSettings { /// either failed to get a cost estimate or has not been estimated yet. pub consider_no_estimate_tx_prob: u8, /// Size of the nonce cache. This avoids MARF look-ups. - pub nonce_cache_size: u64, + pub nonce_cache_size: usize, /// Size of the candidate cache. These are the candidates that will be retried after each /// transaction is mined. - pub candidate_retry_cache_size: u64, + pub candidate_retry_cache_size: usize, /// Types of transactions we'll consider pub txs_to_consider: HashSet, /// Origins for transactions that we'll consider @@ -1014,128 +1015,6 @@ impl<'a> MemPoolTx<'a> { } } -/// Used to locally cache nonces to avoid repeatedly looking them up in the nonce. -struct NonceCache { - cache: HashMap, - /// The maximum size that this cache can be. - max_cache_size: usize, -} - -impl NonceCache { - fn new(nonce_cache_size: u64) -> Self { - let max_size: usize = nonce_cache_size - .try_into() - .expect("Could not cast `nonce_cache_size` as `usize`."); - Self { - cache: HashMap::new(), - max_cache_size: max_size, - } - } - - /// Get a nonce from the cache. - /// First, the RAM cache will be checked for this address. - /// If absent, then the `nonces` table will be queried for this address. - /// If absent, then the MARF will be queried for this address. - /// - /// If not in RAM, the nonce will be opportunistically stored to the `nonces` table. If that - /// fails due to lock contention, then the method will return `true` for its second tuple argument. - /// - /// Returns (nonce, should-try-store-again?) - fn get( - &mut self, - address: &StacksAddress, - clarity_tx: &mut C, - mempool_db: &DBConn, - ) -> (u64, bool) - where - C: ClarityConnection, - { - #[cfg(test)] - assert!(self.cache.len() <= self.max_cache_size); - - // Check in-memory cache - match self.cache.get(address) { - Some(nonce) => (*nonce, false), - None => { - // Check sqlite cache - let opt_nonce = match db_get_nonce(mempool_db, address) { - Ok(opt_nonce) => opt_nonce, - Err(e) => { - warn!("error retrieving nonce from mempool db: {}", e); - None - } - }; - match opt_nonce { - Some(nonce) => { - // Copy this into the in-memory cache if there is space - if self.cache.len() < self.max_cache_size { - self.cache.insert(address.clone(), nonce); - } - (nonce, false) - } - None => { - let nonce = - StacksChainState::get_nonce(clarity_tx, &address.clone().into()); - - let should_store_again = match db_set_nonce(mempool_db, address, nonce) { - Ok(_) => false, - Err(e) => { - debug!("error caching nonce to sqlite: {}", e); - true - } - }; - - if self.cache.len() < self.max_cache_size { - self.cache.insert(address.clone(), nonce); - } - (nonce, should_store_again) - } - } - } - } - } - - /// Store the (address, nonce) pair to the `nonces` table. - /// If storage fails, return false. - /// Otherwise return true. - fn update(&mut self, address: StacksAddress, value: u64, mempool_db: &DBConn) -> bool { - // Sqlite cache - let success = match db_set_nonce(mempool_db, &address, value) { - Ok(_) => true, - Err(e) => { - warn!("error caching nonce to sqlite: {}", e); - false - } - }; - - // In-memory cache - match self.cache.get_mut(&address) { - Some(nonce) => { - *nonce = value; - } - None => (), - } - - success - } -} - -fn db_set_nonce(conn: &DBConn, address: &StacksAddress, nonce: u64) -> Result<(), db_error> { - let addr_str = address.to_string(); - let nonce_i64 = u64_to_sql(nonce)?; - - let sql = "INSERT OR REPLACE INTO nonces (address, nonce) VALUES (?1, ?2)"; - conn.execute(sql, params![addr_str, nonce_i64])?; - Ok(()) -} - -fn db_get_nonce(conn: &DBConn, address: &StacksAddress) -> Result, db_error> { - let addr_str = address.to_string(); - - let sql = "SELECT nonce FROM nonces WHERE address = ?"; - query_row(conn, sql, params![addr_str]) -} - #[cfg(test)] pub fn db_get_all_nonces(conn: &DBConn) -> Result, db_error> { let sql = "SELECT * FROM nonces"; @@ -1165,14 +1044,11 @@ struct CandidateCache { } impl CandidateCache { - fn new(candidate_retry_cache_size: u64) -> Self { - let max_size: usize = candidate_retry_cache_size - .try_into() - .expect("Could not cast `candidate_retry_cache_size` as usize."); + fn new(candidate_retry_cache_size: usize) -> Self { Self { cache: VecDeque::new(), next: VecDeque::new(), - max_cache_size: max_size, + max_cache_size: candidate_retry_cache_size, } } @@ -1650,10 +1526,6 @@ impl MemPoolDB { let mut candidate_cache = CandidateCache::new(settings.candidate_retry_cache_size); let mut nonce_cache = NonceCache::new(settings.nonce_cache_size); - // set of (address, nonce) to store after the inner loop completes. This will be done in a - // single transaction. This cannot grow to more than `settings.nonce_cache_size` entries. - let mut retry_store = HashMap::new(); - let sql = " SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate FROM mempool @@ -1734,29 +1606,11 @@ impl MemPoolDB { }; // Check the nonces. - let (expected_origin_nonce, retry_store_origin_nonce) = - nonce_cache.get(&candidate.origin_address, clarity_tx, self.conn()); - let (expected_sponsor_nonce, retry_store_sponsor_nonce) = - nonce_cache.get(&candidate.sponsor_address, clarity_tx, self.conn()); - - // Try storing these nonces later if we failed to do so here, e.g. due to some other - // thread holding the write-lock on the mempool DB. - if retry_store_origin_nonce { - Self::save_nonce_for_retry( - &mut retry_store, - settings.nonce_cache_size, - candidate.origin_address.clone(), - expected_origin_nonce, - ); - } - if retry_store_sponsor_nonce { - Self::save_nonce_for_retry( - &mut retry_store, - settings.nonce_cache_size, - candidate.sponsor_address.clone(), - expected_sponsor_nonce, - ); - } + let mut nonce_conn = self.reopen(false)?; + let expected_origin_nonce = + nonce_cache.get(&candidate.origin_address, clarity_tx, &mut nonce_conn); + let expected_sponsor_nonce = + nonce_cache.get(&candidate.sponsor_address, clarity_tx, &mut nonce_conn); match order_nonces( candidate.origin_nonce, @@ -1872,34 +1726,17 @@ impl MemPoolDB { match tx_event { TransactionEvent::Success(_) => { // Bump nonces in the cache for the executed transaction - let stored = nonce_cache.update( + nonce_cache.set( consider.tx.metadata.origin_address, expected_origin_nonce + 1, - self.conn(), + &mut nonce_conn, ); - if !stored { - Self::save_nonce_for_retry( - &mut retry_store, - settings.nonce_cache_size, - consider.tx.metadata.origin_address, - expected_origin_nonce + 1, - ); - } - if consider.tx.tx.auth.is_sponsored() { - let stored = nonce_cache.update( + nonce_cache.set( consider.tx.metadata.sponsor_address, expected_sponsor_nonce + 1, - self.conn(), + &mut nonce_conn, ); - if !stored { - Self::save_nonce_for_retry( - &mut retry_store, - settings.nonce_cache_size, - consider.tx.metadata.sponsor_address, - expected_sponsor_nonce + 1, - ); - } } output_events.push(tx_event); } @@ -1933,13 +1770,8 @@ impl MemPoolDB { drop(query_stmt_null); drop(query_stmt_fee); - if retry_store.len() > 0 { - let tx = self.tx_begin()?; - for (address, nonce) in retry_store.into_iter() { - nonce_cache.update(address, nonce, &tx); - } - tx.commit()?; - } + // Write through the nonce cache to the database + nonce_cache.flush(&mut self.db); debug!( "Mempool iteration finished"; diff --git a/stackslib/src/core/mod.rs b/stackslib/src/core/mod.rs index ba4dbf14d2..7b7fd4ee15 100644 --- a/stackslib/src/core/mod.rs +++ b/stackslib/src/core/mod.rs @@ -30,6 +30,7 @@ use crate::burnchains::bitcoin::BitcoinNetworkType; use crate::burnchains::{Burnchain, Error as burnchain_error}; use crate::chainstate::burn::ConsensusHash; pub mod mempool; +pub mod nonce_cache; #[cfg(test)] pub mod tests; diff --git a/stackslib/src/core/nonce_cache.rs b/stackslib/src/core/nonce_cache.rs new file mode 100644 index 0000000000..54c9acb563 --- /dev/null +++ b/stackslib/src/core/nonce_cache.rs @@ -0,0 +1,253 @@ +// Copyright (C) 2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::thread; +use std::time::Duration; + +use clarity::types::chainstate::StacksAddress; +use clarity::util::lru_cache::LruCache; +use clarity::vm::clarity::ClarityConnection; +use rand::Rng; +use rusqlite::params; + +use super::mempool::MemPoolTx; +use super::MemPoolDB; +use crate::chainstate::stacks::db::StacksChainState; +use crate::util_lib::db::{query_row, u64_to_sql, DBConn, Error as db_error}; + +/// Used to cache nonces in memory and in the mempool database. +/// 1. MARF - source of truth for nonces +/// 2. Nonce DB - table in mempool sqlite database +/// 3. HashMap - in-memory cache for nonces +/// The in-memory cache is restricted to a maximum size to avoid memory +/// exhaustion. When the cache is full, it should be flushed to the database +/// and cleared. It is recommended to do this in between batches of candidate +/// transactions from the mempool. +pub struct NonceCache { + /// In-memory LRU cache of nonces. + cache: LruCache, +} + +impl NonceCache { + pub fn new(max_size: usize) -> Self { + Self { + cache: LruCache::new(max_size), + } + } + + /// Get a nonce. + /// First, the RAM cache will be checked for this address. + /// If absent, then the `nonces` table will be queried for this address. + /// If absent, then the MARF will be queried for this address. + /// + /// If not in RAM, the nonce will be opportunistically stored to the `nonces` table. If that + /// fails due to lock contention, then the method will return `true` for its second tuple argument. + /// + /// Returns (nonce, should-try-store-again?) + pub fn get( + &mut self, + address: &StacksAddress, + clarity_tx: &mut C, + mempool_db: &mut DBConn, + ) -> u64 + where + C: ClarityConnection, + { + // Check in-memory cache + match self.cache.get(address) { + Some(nonce) => nonce, + None => { + // Check sqlite cache + let opt_nonce = match db_get_nonce(mempool_db, address) { + Ok(opt_nonce) => opt_nonce, + Err(e) => { + warn!("error retrieving nonce from mempool db: {}", e); + None + } + }; + match opt_nonce { + Some(nonce) => { + // Insert into in-memory cache, but it is not dirty, + // since we just got it from the database. + let evicted = self.cache.insert_clean(address.clone(), nonce); + if evicted.is_some() { + // If we evicted something, we need to flush the cache. + self.flush_with_evicted(mempool_db, evicted); + } + nonce + } + None => { + let nonce = + StacksChainState::get_nonce(clarity_tx, &address.clone().into()); + + self.set(address.clone(), nonce, mempool_db); + nonce + } + } + } + } + } + + /// Store the (address, nonce) pair to the `nonces` table. + /// If storage fails, return false. + /// Otherwise return true. + pub fn set(&mut self, address: StacksAddress, value: u64, conn: &mut DBConn) { + let evicted = self.cache.insert(address.clone(), value); + if evicted.is_some() { + // If we evicted something, we need to flush the cache. + self.flush_with_evicted(conn, evicted); + } + } + + pub fn flush_with_evicted(&mut self, conn: &mut DBConn, evicted: Option<(StacksAddress, u64)>) { + const MAX_BACKOFF: Duration = Duration::from_secs(30); + let mut backoff = Duration::from_millis(rand::thread_rng().gen_range(50..200)); + + loop { + let result = self.try_flush_with_evicted(conn, evicted); + + match result { + Ok(_) => return, // Success: exit the loop + Err(e) => { + // Calculate a backoff duration + warn!("Nonce cache flush failed: {e}. Retrying in {backoff:?}"); + + // Sleep for the backoff duration + thread::sleep(backoff); + + if backoff < MAX_BACKOFF { + // Exponential backoff + backoff = backoff * 2 + + Duration::from_millis(rand::thread_rng().gen_range(50..200)); + } + } + } + } + } + + pub fn try_flush_with_evicted( + &mut self, + conn: &mut DBConn, + evicted: Option<(StacksAddress, u64)>, + ) -> Result<(), db_error> { + // Flush the cache to the database + let sql = "INSERT OR REPLACE INTO nonces (address, nonce) VALUES (?1, ?2)"; + + let tx = conn.transaction()?; + + if let Some((addr, nonce)) = evicted { + tx.execute(sql, params![addr, nonce])?; + } + + self.cache.flush(|addr, nonce| { + tx.execute(sql, params![addr, nonce])?; + Ok::<(), db_error>(()) + })?; + + tx.commit()?; + + Ok(()) + } + + pub fn flush(&mut self, conn: &mut DBConn) { + self.flush_with_evicted(conn, None) + } +} + +fn db_set_nonce(conn: &DBConn, address: &StacksAddress, nonce: u64) -> Result<(), db_error> { + let addr_str = address.to_string(); + let nonce_i64 = u64_to_sql(nonce)?; + + let sql = "INSERT OR REPLACE INTO nonces (address, nonce) VALUES (?1, ?2)"; + conn.execute(sql, params![addr_str, nonce_i64])?; + Ok(()) +} + +fn db_get_nonce(conn: &DBConn, address: &StacksAddress) -> Result, db_error> { + let addr_str = address.to_string(); + + let sql = "SELECT nonce FROM nonces WHERE address = ?"; + query_row(conn, sql, params![addr_str]) +} + +#[cfg(test)] +mod tests { + use clarity::consts::CHAIN_ID_TESTNET; + use clarity::types::chainstate::StacksBlockId; + use clarity::types::Address; + use clarity::vm::tests::{TEST_BURN_STATE_DB, TEST_HEADER_DB}; + + use super::*; + use crate::chainstate::stacks::db::test::{chainstate_path, instantiate_chainstate}; + use crate::chainstate::stacks::index::ClarityMarfTrieId; + use crate::clarity_vm::clarity::ClarityInstance; + use crate::clarity_vm::database::marf::MarfedKV; + + #[test] + fn test_nonce_cache() { + let _chainstate = instantiate_chainstate(false, 0x80000000, function_name!()); + let chainstate_path = chainstate_path(function_name!()); + let mut mempool = MemPoolDB::open_test(false, CHAIN_ID_TESTNET, &chainstate_path).unwrap(); + let mut cache = NonceCache::new(2); + + let addr1 = + StacksAddress::from_string("ST1PQHQKV0RJXZFY1DGX8MNSNYVE3VGZJSRTPGZGM").unwrap(); + let addr2 = + StacksAddress::from_string("ST1SJ3DTE5DN7X54YDH5D64R3BCB6A2AG2ZQ8YPD5").unwrap(); + let addr3 = + StacksAddress::from_string("ST2CY5V39NHDPWSXMW9QDT3HC3GD6Q6XX4CFRK9AG").unwrap(); + + let conn = &mut mempool.db; + cache.set(addr1.clone(), 1, conn); + cache.set(addr2.clone(), 2, conn); + + let marf = MarfedKV::temporary(); + let mut clarity_instance = ClarityInstance::new(false, CHAIN_ID_TESTNET, marf); + clarity_instance + .begin_test_genesis_block( + &StacksBlockId::sentinel(), + &StacksBlockId([0 as u8; 32]), + &TEST_HEADER_DB, + &TEST_BURN_STATE_DB, + ) + .commit_block(); + let mut clarity_conn = clarity_instance.begin_block( + &StacksBlockId([0 as u8; 32]), + &StacksBlockId([1 as u8; 32]), + &TEST_HEADER_DB, + &TEST_BURN_STATE_DB, + ); + + clarity_conn.as_transaction(|clarity_tx| { + assert_eq!(cache.get(&addr1, clarity_tx, conn), 1); + assert_eq!(cache.get(&addr2, clarity_tx, conn), 2); + // addr3 is not in the cache, so it should be fetched from the + // clarity instance (and get 0) + assert_eq!(cache.get(&addr3, clarity_tx, conn), 0); + }); + } + + #[test] + fn test_db_set_nonce() { + let _chainstate = instantiate_chainstate(false, 0x80000000, function_name!()); + let chainstate_path = chainstate_path(function_name!()); + let mut mempool = MemPoolDB::open_test(false, CHAIN_ID_TESTNET, &chainstate_path).unwrap(); + let conn = &mut mempool.db; + let addr = StacksAddress::from_string("ST2JHG361ZXG51QTKY2NQCVBPPRRE2KZB1HR05NNC").unwrap(); + db_set_nonce(&conn, &addr, 123).unwrap(); + assert_eq!(db_get_nonce(&conn, &addr).unwrap().unwrap(), 123); + } +} diff --git a/testnet/stacks-node/src/nakamoto_node/miner.rs b/testnet/stacks-node/src/nakamoto_node/miner.rs index d9edf97e90..150fd82f56 100644 --- a/testnet/stacks-node/src/nakamoto_node/miner.rs +++ b/testnet/stacks-node/src/nakamoto_node/miner.rs @@ -329,6 +329,13 @@ impl BlockMinerThread { )) })?; + // Reset the nonce cache, since it is only updated while mining + let mut mem_pool = self + .config + .connect_mempool_db() + .expect("Database failure opening mempool"); + mem_pool.reset_nonce_cache()?; + // now, actually run this tenure loop { if let Err(e) = self.miner_main_loop(