Skip to content

Commit

Permalink
Fix reorgs permanently for reorgs up to 6 blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw committed Dec 14, 2024
1 parent 249848d commit 7b622e9
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/elements/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ where

// save updated stats to cache
if let Some(lastblock) = lastblock {
chain.store().cache_db().write(
chain.store().cache_db().write_nocache(
vec![asset_cache_row(asset_id, &newstats, &lastblock)],
DBFlush::Enable,
);
Expand Down
149 changes: 144 additions & 5 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use bounded_vec_deque::BoundedVecDeque;
use rocksdb;

use std::collections::HashSet;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::RwLock;

use crate::config::Config;
use crate::util::{bincode_util, Bytes};
Expand Down Expand Up @@ -134,11 +138,23 @@ impl<'a> Iterator for ReverseScanGroupIterator<'a> {
}
}

type SingleBlockCache = HashSet<Vec<u8>>;
type TipsCache = BoundedVecDeque<SingleBlockCache>;

#[derive(Debug)]
pub struct DB {
db: rocksdb::DB,
// BoundedVecDeque of most recent blocks
// Outer Vec is a list of rocksdb keys to remove when reorged
// Inner Vec is the key (a key is Vec<u8>)
// It will automatically drop "blocks" that go over the bound.
rollback_cache: RwLock<TipsCache>,
rollback_active: AtomicBool,
}

// 6 blocks should be enough
const CACHE_CAPACITY: usize = 6;

#[derive(Copy, Clone, Debug)]
pub enum DBFlush {
Disable,
Expand All @@ -147,8 +163,13 @@ pub enum DBFlush {

impl DB {
pub fn open(path: &Path, config: &Config) -> DB {
let mut rollback_cache = BoundedVecDeque::with_capacity(CACHE_CAPACITY, CACHE_CAPACITY);
rollback_cache.push_back(HashSet::new()); // last HashSet is "current block"
let db = DB {
db: open_raw_db(path),
// TODO: Make the number of blocks configurable? 6 should be fine for mainnet
rollback_cache: RwLock::new(rollback_cache),
rollback_active: AtomicBool::new(false),
};
db.verify_compatibility(config);
db
Expand Down Expand Up @@ -220,17 +241,123 @@ impl DB {
ReverseScanGroupIterator::new(iters, value_offset)
}

pub fn write(&self, mut rows: Vec<DBRow>, flush: DBFlush) {
fn fill_cache(&self, key: &[u8]) {
// Single letter keys tend to be related to versioning and tips
// So do not cache as they don't need to be rolled back
if key.len() < 2 {
return;
}
self.with_cache(|cache| {
cache.insert(key.to_owned());
});
}

fn with_cache<F>(&self, func: F)
where
F: FnOnce(&mut SingleBlockCache),
{
func(
self.rollback_cache
.write()
.unwrap()
.back_mut()
.expect("Always one block"),
)
}

pub fn tick_next_block(&self) {
// Adding a new block's worth of cache
// This will automatically drop the oldest block (HashSet)
self.rollback_cache
.write()
.unwrap()
.push_back(HashSet::new());
}

/// Performs a rollback of `count` blocks, then ticks one block forward
pub fn rollback(&self, mut count: usize) -> usize {
if count == 0 {
return 0;
}
let mut cache = self.rollback_cache.write().unwrap();
while count > 0 {
if let Some(block) = cache.pop_back() {
debug!(
"Rolling back DB cached block with {} entries @ {:?}",
block.len(),
self.db.path()
);
for key in block {
// Ignore rocksdb errors, but log them
let _ = self.db.delete(key).inspect_err(|err| {
warn!("Error when deleting rocksdb rollback cache: {err}");
});
}
count -= 1;
} else {
break;
}
}
cache.push_back(HashSet::new());
count
}

pub fn rollbacks_enabled(&self) -> bool {
self.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
}

pub fn disable_rollbacks(&self) {
self.rollback_active
.store(false, std::sync::atomic::Ordering::Release);
}

pub fn enable_rollbacks(&self) {
self.rollback_active
.store(true, std::sync::atomic::Ordering::Release);
}

pub fn write_nocache(&self, rows: Vec<DBRow>, flush: DBFlush) {
self.write_blocks_nocache(vec![rows], flush);
}

pub fn write_blocks(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush) {
self.write_blocks_inner(blocks, flush, false)
}

pub fn write_blocks_nocache(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush) {
self.write_blocks_inner(blocks, flush, true)
}

#[inline]
fn write_blocks_inner(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush, skip_cache: bool) {
debug!(
"writing {} rows to {:?}, flush={:?}",
rows.len(),
blocks.iter().map(|b| b.len()).sum::<usize>(),
self.db,
flush
);
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
let mut batch = rocksdb::WriteBatch::default();
for row in rows {
batch.put(&row.key, &row.value);
for mut rows in blocks {
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
if !skip_cache
&& self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.with_cache(|cache| {
for row in &rows {
cache.insert(row.key.clone());
batch.put(&row.key, &row.value);
}
});
// Special case: we should tick forward blocks
self.tick_next_block();
} else {
for row in &rows {
batch.put(&row.key, &row.value);
}
}
}
let do_flush = match flush {
DBFlush::Enable => true,
Expand All @@ -247,10 +374,22 @@ impl DB {
}

pub fn put(&self, key: &[u8], value: &[u8]) {
if self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.fill_cache(key);
}
self.db.put(key, value).unwrap();
}

pub fn put_sync(&self, key: &[u8], value: &[u8]) {
if self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.fill_cache(key);
}
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(true);
self.db.put_opt(key, value, &opts).unwrap();
Expand Down
61 changes: 48 additions & 13 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ impl Store {
}
}

pub fn tick_next_block(&self) {
self.txstore_db.tick_next_block();
self.history_db.tick_next_block();
self.cache_db.tick_next_block();
}

pub fn enable_rollback_cache(&self) {
self.txstore_db.enable_rollbacks();
self.history_db.enable_rollbacks();
self.cache_db.enable_rollbacks();
}

pub fn rollback(&self, count: usize) {
let mut leftover = 0;
leftover += self.txstore_db.rollback(count);
leftover += self.history_db.rollback(count);
leftover += self.cache_db.rollback(count);
if leftover > 0 {
warn!("Rolling back all DB caches missed {count} blocks. Re-orged duplicates might still be active in the DB.")
}
}
pub fn txstore_db(&self) -> &DB {
&self.txstore_db
}
Expand Down Expand Up @@ -273,6 +294,18 @@ impl Indexer {
let tip = daemon.getbestblockhash()?;
let new_headers = self.get_new_headers(&daemon, &tip)?;

// Deal with re-orgs before indexing
let headers_len = {
let mut headers = self.store.indexed_headers.write().unwrap();
let reorged = headers.apply(&new_headers);
assert_eq!(tip, *headers.tip());
// reorg happened
if !reorged.is_empty() {
self.store.rollback(reorged.len());
}
headers.len()
};

let to_add = self.headers_to_add(&new_headers);
debug!(
"adding transactions from {} blocks using {:?}",
Expand Down Expand Up @@ -301,16 +334,20 @@ impl Indexer {
// update the synced tip *after* the new data is flushed to disk
debug!("updating synced tip to {:?}", tip);
self.store.txstore_db.put_sync(b"t", &serialize(&tip));

let mut headers = self.store.indexed_headers.write().unwrap();
headers.apply(new_headers);
assert_eq!(tip, *headers.tip());
// Ticking cache DB "block every time we update"
// This means that each "block" essentially contains the
// cache updates between each update() call, and we will
// rollback more cache_db than other DBs when rolling back
// but this is just a cache anyway.
// We'd rather not have bad data in the cache.
self.store.cache_db.tick_next_block();

if let FetchFrom::BlkFiles = self.from {
self.store.enable_rollback_cache();
self.from = FetchFrom::Bitcoind;
}

self.tip_metric.set(headers.len() as i64 - 1);
self.tip_metric.set(headers_len as i64 - 1);

Ok(tip)
}
Expand All @@ -324,7 +361,7 @@ impl Indexer {
};
{
let _timer = self.start_timer("add_write");
self.store.txstore_db.write(rows, self.flush);
self.store.txstore_db.write_blocks(rows, self.flush);
}

self.store
Expand Down Expand Up @@ -360,7 +397,7 @@ impl Indexer {
}
index_blocks(blocks, &previous_txos_map, &self.iconfig)
};
self.store.history_db.write(rows, self.flush);
self.store.history_db.write_blocks(rows, self.flush);
}
}

Expand Down Expand Up @@ -820,7 +857,7 @@ impl ChainQuery {
// save updated utxo set to cache
if let Some(lastblock) = lastblock {
if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write(
self.store.cache_db.write_nocache(
vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()],
flush,
);
Expand Down Expand Up @@ -928,7 +965,7 @@ impl ChainQuery {
// save updated stats to cache
if let Some(lastblock) = lastblock {
if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write(
self.store.cache_db.write_nocache(
vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()],
flush,
);
Expand Down Expand Up @@ -1256,7 +1293,7 @@ fn load_blockheaders(db: &DB) -> HashMap<BlockHash, BlockHeader> {
.collect()
}

fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRow> {
fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<Vec<DBRow>> {
// persist individual transactions:
// T{txid} → {rawtx}
// C{txid}{blockhash}{height} →
Expand Down Expand Up @@ -1284,7 +1321,6 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRo
rows.push(BlockRow::new_done(blockhash).into_row()); // mark block as "added"
rows
})
.flatten()
.collect()
}

Expand Down Expand Up @@ -1370,7 +1406,7 @@ fn index_blocks(
block_entries: &[BlockEntry],
previous_txos_map: &HashMap<OutPoint, TxOut>,
iconfig: &IndexerConfig,
) -> Vec<DBRow> {
) -> Vec<Vec<DBRow>> {
block_entries
.par_iter() // serialization is CPU-intensive
.map(|b| {
Expand All @@ -1389,7 +1425,6 @@ fn index_blocks(
rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed"
rows
})
.flatten()
.collect()
}

Expand Down
12 changes: 7 additions & 5 deletions src/util/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl HeaderList {
);

let mut headers = HeaderList::empty();
headers.apply(headers.order(headers_chain));
headers.apply(&headers.order(headers_chain));
headers
}

Expand Down Expand Up @@ -155,7 +155,8 @@ impl HeaderList {
.collect()
}

pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) {
/// Returns any re-orged headers
pub fn apply(&mut self, new_headers: &[HeaderEntry]) -> Vec<HeaderEntry> {
// new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip)
for i in 1..new_headers.len() {
assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height());
Expand All @@ -175,21 +176,22 @@ impl HeaderList {
assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash);
height
}
None => return,
None => return vec![],
};
debug!(
"applying {} new headers from height {}",
new_headers.len(),
new_height
);
let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries
let removed = self.headers.split_off(new_height); // keep [0..new_height) entries
for new_header in new_headers {
let height = new_header.height();
assert_eq!(height, self.headers.len());
self.tip = *new_header.hash();
self.headers.push(new_header);
self.headers.push(new_header.clone());
self.heights.insert(self.tip, height);
}
removed
}

pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {
Expand Down

0 comments on commit 7b622e9

Please sign in to comment.