diff --git a/src/elements/asset.rs b/src/elements/asset.rs index 1a3bd24d..7ce96382 100644 --- a/src/elements/asset.rs +++ b/src/elements/asset.rs @@ -11,7 +11,7 @@ use crate::chain::{BNetwork, BlockHash, Network, Txid}; use crate::elements::peg::{get_pegin_data, get_pegout_data, PeginInfo, PegoutInfo}; use crate::elements::registry::{AssetMeta, AssetRegistry}; use crate::errors::*; -use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow}; +use crate::new_index::schema::{Operation, TxHistoryInfo, TxHistoryKey, TxHistoryRow}; use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query}; use crate::util::{bincode_util, full_hash, Bytes, FullHash, TransactionStatus, TxInput}; @@ -178,11 +178,17 @@ pub fn index_confirmed_tx_assets( network: Network, parent_network: BNetwork, rows: &mut Vec, + op: &Operation, ) { let (history, issuances) = index_tx_assets(tx, network, parent_network); rows.extend(history.into_iter().map(|(asset_id, info)| { - asset_history_row(&asset_id, confirmed_height, tx_position, info).into_row() + let history_row = asset_history_row(&asset_id, confirmed_height, tx_position, info); + if let Operation::DeleteBlocksWithHistory(tx) = op { + tx.send(history_row.key.hash) + .expect("unbounded channel won't fail"); + } + history_row.into_row() })); // the initial issuance is kept twice: once in the history index under I, diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 98de2d30..75dacd7f 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -242,6 +242,15 @@ impl DB { self.db.write_opt(batch, &opts).unwrap(); } + pub fn delete(&self, keys: Vec>) { + debug!("deleting {} rows from {:?}", keys.len(), self.db); + for key in keys { + let _ = self.db.delete(key).inspect_err(|err| { + warn!("Error while deleting DB row: {err}"); + }); + } + } + pub fn flush(&self) { self.db.flush().unwrap(); } diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index 3b748770..04d873bf 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -42,6 +42,57 @@ pub struct BlockEntry { type SizedBlock = (Block, u32); +pub struct SequentialFetcher { + fetcher: Box Vec>>, +} + +impl SequentialFetcher { + fn from Vec> + 'static>(pre_func: F) -> Self { + SequentialFetcher { + fetcher: Box::new(pre_func), + } + } + + pub fn map(self, mut func: FN) + where + FN: FnMut(Vec), + { + for item in (self.fetcher)() { + func(item); + } + } +} + +pub fn bitcoind_sequential_fetcher( + daemon: &Daemon, + new_headers: Vec, +) -> Result> { + let daemon = daemon.reconnect()?; + Ok(SequentialFetcher::from(move || { + new_headers + .chunks(100) + .map(|entries| { + let blockhashes: Vec = entries.iter().map(|e| *e.hash()).collect(); + let blocks = daemon + .getblocks(&blockhashes) + .expect("failed to get blocks from bitcoind"); + assert_eq!(blocks.len(), entries.len()); + let block_entries: Vec = blocks + .into_iter() + .zip(entries) + .map(|(block, entry)| BlockEntry { + entry: entry.clone(), // TODO: remove this clone() + size: block.size() as u32, + block, + }) + .collect(); + assert_eq!(block_entries.len(), entries.len()); + block_entries + }) + .collect() + })) +} + pub struct Fetcher { receiver: crossbeam_channel::Receiver, thread: thread::JoinHandle<()>, diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index d09605a8..49ef3c50 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -37,7 +37,7 @@ use crate::new_index::fetch::{start_fetcher, BlockEntry, FetchFrom}; #[cfg(feature = "liquid")] use crate::elements::{asset, peg}; -use super::db::ReverseScanGroupIterator; +use super::{db::ReverseScanGroupIterator, fetch::bitcoind_sequential_fetcher}; const MIN_HISTORY_ITEMS_TO_CACHE: usize = 100; @@ -209,6 +209,22 @@ pub struct ChainQuery { network: Network, } +#[derive(Debug, Clone)] +pub enum Operation { + AddBlocks, + DeleteBlocks, + DeleteBlocksWithHistory(crossbeam_channel::Sender<[u8; 32]>), +} + +impl std::fmt::Display for Operation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Operation::AddBlocks => "Adding", + Operation::DeleteBlocks | Operation::DeleteBlocksWithHistory(_) => "Deleting", + }) + } +} + // TODO: &[Block] should be an iterator / a queue. impl Indexer { pub fn open(store: Arc, from: FetchFrom, config: &Config, metrics: &Metrics) -> Self { @@ -268,18 +284,68 @@ impl Indexer { Ok(result) } + fn reorg(&self, reorged: Vec, daemon: &Daemon) -> Result<()> { + if reorged.len() > 10 { + warn!( + "reorg of over 10 blocks ({}) detected! Wonky stuff might happen!", + reorged.len() + ); + } + // This channel holds a Vec of [u8; 32] scripts found in the blocks (with duplicates) + // if we reorg the whole mainnet chain it should come out to about 145 GB of memory. + let (tx, rx) = crossbeam_channel::unbounded(); + // Delete history_db + bitcoind_sequential_fetcher(daemon, reorged.clone())? + .map(|blocks| self.index(&blocks, Operation::DeleteBlocksWithHistory(tx.clone()))); + // Delete txstore + bitcoind_sequential_fetcher(daemon, reorged)? + .map(|blocks| self.add(&blocks, Operation::DeleteBlocks)); + // All senders must be dropped for receiver iterator to finish + drop(tx); + + // All senders are dropped by now, so the receiver will iterate until the + // end of the unbounded queue. + let scripts = rx.into_iter().collect::>(); + for script in scripts { + // cancel the script cache DB for these scripts. They might get incorrect data mixed in. + self.store.cache_db.delete(vec![ + StatsCacheRow::key(&script), + UtxoCacheRow::key(&script), + #[cfg(feature = "liquid")] + [b"z", &script[..]].concat(), // asset cache key + ]); + } + Ok(()) + } + pub fn update(&mut self, daemon: &Daemon) -> Result { let daemon = daemon.reconnect()?; let tip = daemon.getbestblockhash()?; let new_headers = self.get_new_headers(&daemon, &tip)?; + // Must rollback blocks before rolling forward + let headers_len = { + let mut headers = self.store.indexed_headers.write().unwrap(); + let reorged = headers.apply(new_headers.clone()); + assert_eq!(tip, *headers.tip()); + let headers_len = headers.len(); + drop(headers); + + if !reorged.is_empty() { + self.reorg(reorged, &daemon)?; + } + + headers_len + }; + let to_add = self.headers_to_add(&new_headers); debug!( "adding transactions from {} blocks using {:?}", to_add.len(), self.from ); - start_fetcher(self.from, &daemon, to_add)?.map(|blocks| self.add(&blocks)); + start_fetcher(self.from, &daemon, to_add)? + .map(|blocks| self.add(&blocks, Operation::AddBlocks)); self.start_auto_compactions(&self.store.txstore_db); let to_index = self.headers_to_index(&new_headers); @@ -288,7 +354,8 @@ impl Indexer { to_index.len(), self.from ); - start_fetcher(self.from, &daemon, to_index)?.map(|blocks| self.index(&blocks)); + start_fetcher(self.from, &daemon, to_index)? + .map(|blocks| self.index(&blocks, Operation::AddBlocks)); self.start_auto_compactions(&self.store.history_db); if let DBFlush::Disable = self.flush { @@ -302,65 +369,91 @@ impl Indexer { debug!("updating synced tip to {:?}", tip); self.store.txstore_db.put_sync(b"t", &serialize(&tip)); - let mut headers = self.store.indexed_headers.write().unwrap(); - headers.apply(new_headers); - assert_eq!(tip, *headers.tip()); - if let FetchFrom::BlkFiles = self.from { self.from = FetchFrom::Bitcoind; } - self.tip_metric.set(headers.len() as i64 - 1); + self.tip_metric.set(headers_len as i64 - 1); Ok(tip) } - fn add(&self, blocks: &[BlockEntry]) { - debug!("Adding {} blocks to Indexer", blocks.len()); + fn add(&self, blocks: &[BlockEntry], op: Operation) { + debug!("{} {} blocks to Indexer", op, blocks.len()); + let write_label = match &op { + Operation::AddBlocks => "add_write", + _ => "delete_write", + }; + // TODO: skip orphaned blocks? let rows = { let _timer = self.start_timer("add_process"); add_blocks(blocks, &self.iconfig) }; { - let _timer = self.start_timer("add_write"); - self.store.txstore_db.write(rows, self.flush); + let _timer = self.start_timer(write_label); + if let Operation::AddBlocks = op { + self.store.txstore_db.write(rows, self.flush); + } else { + self.store + .txstore_db + .delete(rows.into_iter().map(|r| r.key).collect()); + } } - self.store - .added_blockhashes - .write() - .unwrap() - .extend(blocks.iter().map(|b| { - if b.entry.height() % 10_000 == 0 { - info!("Tx indexing is up to height={}", b.entry.height()); - } - b.entry.hash() - })); + if let Operation::AddBlocks = op { + self.store + .added_blockhashes + .write() + .unwrap() + .extend(blocks.iter().map(|b| { + if b.entry.height() % 10_000 == 0 { + info!("Tx indexing is up to height={}", b.entry.height()); + } + b.entry.hash() + })); + } else { + let mut added_blockhashes = self.store.added_blockhashes.write().unwrap(); + for b in blocks { + added_blockhashes.remove(b.entry.hash()); + } + } } - fn index(&self, blocks: &[BlockEntry]) { - debug!("Indexing {} blocks with Indexer", blocks.len()); + fn index(&self, blocks: &[BlockEntry], op: Operation) { + debug!("Indexing ({}) {} blocks with Indexer", op, blocks.len()); let previous_txos_map = { let _timer = self.start_timer("index_lookup"); - lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false) + if matches!(op, Operation::AddBlocks) { + lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false) + } else { + lookup_txos_sequential(&self.store.txstore_db, &get_previous_txos(blocks), false) + } }; let rows = { let _timer = self.start_timer("index_process"); - let added_blockhashes = self.store.added_blockhashes.read().unwrap(); - for b in blocks { - if b.entry.height() % 10_000 == 0 { - info!("History indexing is up to height={}", b.entry.height()); - } - let blockhash = b.entry.hash(); - // TODO: replace by lookup into txstore_db? - if !added_blockhashes.contains(blockhash) { - panic!("cannot index block {} (missing from store)", blockhash); + if let Operation::AddBlocks = op { + let added_blockhashes = self.store.added_blockhashes.read().unwrap(); + for b in blocks { + if b.entry.height() % 10_000 == 0 { + info!("History indexing is up to height={}", b.entry.height()); + } + let blockhash = b.entry.hash(); + // TODO: replace by lookup into txstore_db? + if !added_blockhashes.contains(blockhash) { + panic!("cannot index block {} (missing from store)", blockhash); + } } } - index_blocks(blocks, &previous_txos_map, &self.iconfig) + index_blocks(blocks, &previous_txos_map, &self.iconfig, &op) }; - self.store.history_db.write(rows, self.flush); + if let Operation::AddBlocks = op { + self.store.history_db.write(rows, self.flush); + } else { + self.store + .history_db + .delete(rows.into_iter().map(|r| r.key).collect()); + } } } @@ -1344,6 +1437,7 @@ fn lookup_txos( } }; pool.install(|| { + // Should match lookup_txos_sequential outpoints .par_iter() .filter_map(|outpoint| { @@ -1360,6 +1454,27 @@ fn lookup_txos( }) } +fn lookup_txos_sequential( + txstore_db: &DB, + outpoints: &BTreeSet, + allow_missing: bool, +) -> HashMap { + // Should match lookup_txos + outpoints + .iter() + .filter_map(|outpoint| { + lookup_txo(txstore_db, outpoint) + .or_else(|| { + if !allow_missing { + panic!("missing txo {} in {:?}", outpoint, txstore_db); + } + None + }) + .map(|txo| (*outpoint, txo)) + }) + .collect() +} + fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option { txstore_db .get(&TxOutRow::key(outpoint)) @@ -1370,6 +1485,7 @@ fn index_blocks( block_entries: &[BlockEntry], previous_txos_map: &HashMap, iconfig: &IndexerConfig, + op: &Operation, ) -> Vec { block_entries .par_iter() // serialization is CPU-intensive @@ -1384,6 +1500,7 @@ fn index_blocks( previous_txos_map, &mut rows, iconfig, + op, ); } rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed" @@ -1401,6 +1518,7 @@ fn index_transaction( previous_txos_map: &HashMap, rows: &mut Vec, iconfig: &IndexerConfig, + op: &Operation, ) { // persist history index: // H{funding-scripthash}{spending-height}{spending-block-pos}S{spending-txid:vin}{funding-txid:vout} → "" @@ -1408,6 +1526,11 @@ fn index_transaction( // persist "edges" for fast is-this-TXO-spent check // S{funding-txid:vout}{spending-txid:vin} → "" let txid = full_hash(&tx.txid()[..]); + let script_callback = |script_hash| { + if let Operation::DeleteBlocksWithHistory(tx) = op { + tx.send(script_hash).expect("unbounded channel won't fail"); + } + }; for (txo_index, txo) in tx.output.iter().enumerate() { if is_spendable(txo) || iconfig.index_unspendables { let history = TxHistoryRow::new( @@ -1420,6 +1543,7 @@ fn index_transaction( value: txo.value, }), ); + script_callback(history.key.hash); rows.push(history.into_row()); if iconfig.address_search { @@ -1449,6 +1573,7 @@ fn index_transaction( value: prev_txo.value, }), ); + script_callback(history.key.hash); rows.push(history.into_row()); let edge = TxEdgeRow::new( @@ -1469,6 +1594,7 @@ fn index_transaction( iconfig.network, iconfig.parent_network, rows, + op, ); } diff --git a/src/util/block.rs b/src/util/block.rs index 66d5a5c5..50264531 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -155,7 +155,8 @@ impl HeaderList { .collect() } - pub fn apply(&mut self, new_headers: Vec) { + /// Returns any rolled back blocks in order from old tip first and first block in the fork is last + pub fn apply(&mut self, new_headers: Vec) -> Vec { // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) for i in 1..new_headers.len() { assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height()); @@ -175,14 +176,14 @@ impl HeaderList { assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash); height } - None => return, + None => return vec![], }; debug!( "applying {} new headers from height {}", new_headers.len(), new_height ); - let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries + let mut removed = self.headers.split_off(new_height); // keep [0..new_height) entries for new_header in new_headers { let height = new_header.height(); assert_eq!(height, self.headers.len()); @@ -190,6 +191,8 @@ impl HeaderList { self.headers.push(new_header); self.heights.insert(self.tip, height); } + removed.reverse(); + removed } pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {