From d493cce2027f495846d9fb53966fd1ea89a447d7 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Wed, 18 Dec 2024 12:37:57 +0800 Subject: [PATCH] New utxo snapshot format (#81) * Separate out utxo_store.rs * Introduce SnapshotProcessor * Rework subcoin-utxo-snapshot * write_utxo_snapshot_in_memory() * New snapshot format compatible with Bitcoin Core * Refactor snapshot_processor * Refactor subcoin-utxo-snapshot * Refactor dump_txout_set * Nits * Nits * Fix test * Remove local tests * Nits * fmt * Fix clippy * . * Verify snapshot integrity in snapcake * Rename snapshot_manager * fmt --- Cargo.lock | 7 + Cargo.toml | 2 + .../src/commands/blockchain/dump_txout_set.rs | 71 ++-- .../commands/blockchain/get_txout_set_info.rs | 34 +- crates/subcoin-node/src/utils.rs | 35 ++ crates/subcoin-service/Cargo.toml | 3 +- crates/subcoin-snapcake/Cargo.toml | 3 + crates/subcoin-snapcake/src/main.rs | 1 + .../subcoin-snapcake/src/snapshot_manager.rs | 392 ++++++++++++++++++ .../src/state_sync_wrapper.rs | 183 +------- .../subcoin-snapcake/src/syncing_strategy.rs | 2 +- crates/subcoin-utxo-snapshot/Cargo.toml | 6 +- .../subcoin-utxo-snapshot/src/compressor.rs | 248 +++++++++++ crates/subcoin-utxo-snapshot/src/lib.rs | 263 +++++++++++- crates/subcoin-utxo-snapshot/src/script.rs | 131 ++++++ crates/subcoin-utxo-snapshot/src/serialize.rs | 18 + crates/subcoin-utxo-snapshot/src/tests.rs | 170 ++++++++ 17 files changed, 1313 insertions(+), 256 deletions(-) create mode 100644 crates/subcoin-snapcake/src/snapshot_manager.rs create mode 100644 crates/subcoin-utxo-snapshot/src/compressor.rs create mode 100644 crates/subcoin-utxo-snapshot/src/script.rs create mode 100644 crates/subcoin-utxo-snapshot/src/serialize.rs create mode 100644 crates/subcoin-utxo-snapshot/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 240a2e32..af9fd801 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4936,7 +4936,9 @@ dependencies = [ "glob", "libc", "libz-sys", + "lz4-sys", "tikv-jemalloc-sys", + "zstd-sys", ] [[package]] @@ -10513,6 +10515,7 @@ dependencies = [ name = "subcoin-snapcake" version = "0.1.0" dependencies = [ + "bincode", "bitcoin 0.32.2", "clap", "csv", @@ -10520,6 +10523,7 @@ dependencies = [ "hex", "pallet-bitcoin", "parity-scale-codec", + "rocksdb", "sc-cli", "sc-client-api", "sc-consensus", @@ -10532,6 +10536,7 @@ dependencies = [ "sc-service", "sc-transaction-pool", "serde", + "sha2 0.9.9", "sp-api", "sp-blockchain", "sp-consensus", @@ -10568,6 +10573,8 @@ name = "subcoin-utxo-snapshot" version = "0.1.0" dependencies = [ "bitcoin 0.32.2", + "fastrand", + "hex", "serde", "subcoin-primitives", "txoutset", diff --git a/Cargo.toml b/Cargo.toml index 9eb7c64e..2fcd72d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ default-members = ["crates/subcoin-node"] array-bytes = "6.2.2" async-channel = "1.8.0" async-trait = "0.1" +bincode = "1.3.3" bitcoin = { git = "https://github.com/liuchengxu/rust-bitcoin", branch = "0.32.x-subcoin", default-features = false } bitcoinconsensus = "0.105.0+25.1" bitcoin-explorer = { git = "https://github.com/liuchengxu/Rusty-Bitcoin-Explorer", branch = "main", default-features = false } @@ -54,6 +55,7 @@ parking_lot = "0.12" scale-info = { version = "2.6.0", default-features = false } serde = "1" serde_json = "1" +sha2 = "0.9.9" tempfile = "3.10.1" thiserror = "1.0" tokio = "1.41.1" diff --git a/crates/subcoin-node/src/commands/blockchain/dump_txout_set.rs b/crates/subcoin-node/src/commands/blockchain/dump_txout_set.rs index 4bfb676a..ea7298f1 100644 --- a/crates/subcoin-node/src/commands/blockchain/dump_txout_set.rs +++ b/crates/subcoin-node/src/commands/blockchain/dump_txout_set.rs @@ -1,10 +1,10 @@ use super::MergedParams; use crate::commands::blockchain::{fetch_utxo_set_at, ClientParams}; use crate::utils::Yield; -use bitcoin::consensus::Encodable; use std::fs::File; use std::io::{Stdout, Write}; use std::path::PathBuf; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use subcoin_primitives::runtime::Coin; use subcoin_service::FullClient; @@ -18,15 +18,13 @@ pub struct DumpTxOutSet { #[clap(long)] height: Option, - /// Export the dumped txout set to a CSV file. + /// Path to export the UTXO set in CSV format. #[clap(short, long, value_name = "PATH")] csv: Option, - /// Path to the binary dump file for the UTXO set. + /// Path to export the UTXO set as a binary file. /// /// The binary dump is compatible with the format used by the Bitcoin Core client. - /// You can export the UTXO set from Subcoin and import it into the Bitcoin - /// Core client. /// /// If neither `--csv` nor `--binary` options are provided, the UTXO set will be /// printed to stdout in CSV format. @@ -56,42 +54,57 @@ impl DumpTxOutSetCmd { let (block_number, bitcoin_block_hash, utxo_iter) = fetch_utxo_set_at(&client, height)?; - let mut output_file = if let Some(path) = csv { - println!( - "Dumping UTXO set at #{block_number},{bitcoin_block_hash} to {}", - path.display() - ); - UtxoSetOutput::Csv(std::fs::File::create(path)?) - } else if let Some(path) = binary { + if let Some(path) = binary { let utxo_set_size = fetch_utxo_set_at(&client, height)?.2.count() as u64; println!( - "Dumping UTXO set at #{block_number},{bitcoin_block_hash} to {}", + "Exporting UTXO set ({utxo_set_size}) at #{block_number},{bitcoin_block_hash} to binary file: {}", path.display() ); - println!("UTXO set size: {utxo_set_size}"); - let mut file = std::fs::File::create(&path)?; + let file = std::fs::File::create(&path)?; - let mut data = Vec::new(); - bitcoin_block_hash - .consensus_encode(&mut data) - .map_err(|err| sc_cli::Error::Application(Box::new(err)))?; - utxo_set_size - .consensus_encode(&mut data) - .map_err(|err| sc_cli::Error::Application(Box::new(err)))?; + let mut snapshot_generator = + UtxoSnapshotGenerator::new(path, file, bitcoin::Network::Bitcoin); - let _ = file.write(data.as_slice())?; + snapshot_generator.generate_snapshot_in_mem( + bitcoin_block_hash, + utxo_set_size, + utxo_iter.map(Into::into), + )?; - UtxoSetOutput::Snapshot(UtxoSnapshotGenerator::new(path, file)) + return Ok(()); + } + + let mut output_file = if let Some(path) = csv { + println!( + "Exporting UTXO set at #{block_number},{bitcoin_block_hash} to CSV file: {}", + path.display() + ); + UtxoSetOutput::Csv(std::fs::File::create(path)?) } else { - println!("Dumping UTXO set at #{block_number},{bitcoin_block_hash}"); + println!("Exporting UTXO set at #{block_number},{bitcoin_block_hash} to stdout:"); UtxoSetOutput::Stdout(std::io::stdout()) }; - for (txid, vout, coin) in utxo_iter { - output_file.write(txid, vout, coin)?; + let processed = Arc::new(AtomicUsize::new(0)); + + let grouped_utxos = + subcoin_utxo_snapshot::group_utxos_by_txid(utxo_iter.into_iter().map(Into::into)); + let total_txids = grouped_utxos.len(); + + crate::utils::show_progress_in_background( + processed.clone(), + total_txids as u64, + format!("Dumping UTXO set at block #{block_number}..."), + ); + + for (txid, coins) in grouped_utxos { + for subcoin_utxo_snapshot::OutputEntry { vout, coin } in coins { + output_file.write(txid, vout, coin)?; + } + processed.fetch_add(1, std::sync::atomic::Ordering::Relaxed); // Yield here allows to make the process interruptible by ctrl_c. Yield::new().await; } @@ -101,7 +114,6 @@ impl DumpTxOutSetCmd { } enum UtxoSetOutput { - Snapshot(UtxoSnapshotGenerator), Csv(File), Stdout(Stdout), } @@ -109,9 +121,6 @@ enum UtxoSetOutput { impl UtxoSetOutput { fn write(&mut self, txid: bitcoin::Txid, vout: u32, coin: Coin) -> std::io::Result<()> { match self { - Self::Snapshot(snapshot_generator) => { - snapshot_generator.write_utxo_entry(txid, vout, coin)?; - } Self::Csv(ref mut file) => { let Coin { is_coinbase, diff --git a/crates/subcoin-node/src/commands/blockchain/get_txout_set_info.rs b/crates/subcoin-node/src/commands/blockchain/get_txout_set_info.rs index 142b82c4..80f4102a 100644 --- a/crates/subcoin-node/src/commands/blockchain/get_txout_set_info.rs +++ b/crates/subcoin-node/src/commands/blockchain/get_txout_set_info.rs @@ -1,7 +1,6 @@ use super::MergedParams; use crate::commands::blockchain::{fetch_utxo_set_at, ClientParams}; use crate::utils::Yield; -use indicatif::{ProgressBar, ProgressStyle}; use sc_client_api::HeaderBackend; use sp_api::ProvideRuntimeApi; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -108,8 +107,11 @@ async fn gettxoutsetinfo( let mut muhash = subcoin_crypto::muhash::MuHash3072::new(); if progress_bar { - let loaded = txouts.clone(); - std::thread::spawn(move || show_progress(loaded, total_coins, block_number)); + crate::utils::show_progress_in_background( + txouts.clone(), + total_coins, + format!("Loading UTXO set at block #{block_number}..."), + ); } for (txid, vout, coin) in utxo_iter { @@ -155,32 +157,6 @@ async fn gettxoutsetinfo( Ok(tx_out_set_info) } -fn show_progress(loaded: Arc, total_coins: u64, block_number: u32) { - let pb = ProgressBar::new(total_coins); - - pb.set_message(format!("Loading UTXO set at block #{block_number}...")); - - pb.set_style( - ProgressStyle::default_bar() - .template("{msg} [{bar:40}] {percent:.2}% ({pos}/{len}, {eta})") - .unwrap() - .progress_chars("##-"), - ); - - loop { - let new = loaded.load(Ordering::Relaxed) as u64; - - if new == total_coins { - pb.finish_and_clear(); - return; - } - - pb.set_position(new); - - std::thread::sleep(Duration::from_millis(200)); - } -} - // Custom serializer for total_amount to display 8 decimal places fn serialize_as_btc(amount: &u64, serializer: S) -> Result where diff --git a/crates/subcoin-node/src/utils.rs b/crates/subcoin-node/src/utils.rs index 7885eff2..0737bc91 100644 --- a/crates/subcoin-node/src/utils.rs +++ b/crates/subcoin-node/src/utils.rs @@ -1,3 +1,8 @@ +use indicatif::{ProgressBar, ProgressStyle}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + /// A future that will always `yield` on the first call of `poll` but schedules the /// current task for re-execution. /// @@ -29,3 +34,33 @@ impl futures::Future for Yield { } } } + +pub(crate) fn show_progress_in_background(processed: Arc, total: u64, msg: String) { + std::thread::spawn(move || show_progress(processed, total, msg)); +} + +fn show_progress(processed: Arc, total: u64, msg: String) { + let pb = ProgressBar::new(total); + + pb.set_message(msg); + + pb.set_style( + ProgressStyle::default_bar() + .template("{msg} [{bar:40}] {percent:.2}% ({pos}/{len}, {eta})") + .unwrap() + .progress_chars("##-"), + ); + + loop { + let new = processed.load(Ordering::Relaxed) as u64; + + if new == total { + pb.finish_and_clear(); + return; + } + + pb.set_position(new); + + std::thread::sleep(Duration::from_millis(200)); + } +} diff --git a/crates/subcoin-service/Cargo.toml b/crates/subcoin-service/Cargo.toml index a2985730..4148d2aa 100644 --- a/crates/subcoin-service/Cargo.toml +++ b/crates/subcoin-service/Cargo.toml @@ -28,8 +28,7 @@ sc-network = { workspace = true } sc-network-sync = { workspace = true } sc-rpc = { workspace = true } sc-rpc-api = { workspace = true } -# `test-helpers` for the exposed Client type. -sc-service = { workspace = true, features = ["test-helpers"], default-features = false } +sc-service = { workspace = true, default-features = false } sc-storage-monitor = { workspace = true } sc-sysinfo = { workspace = true } sc-telemetry = { workspace = true } diff --git a/crates/subcoin-snapcake/Cargo.toml b/crates/subcoin-snapcake/Cargo.toml index 10de73a5..0f56c2bf 100644 --- a/crates/subcoin-snapcake/Cargo.toml +++ b/crates/subcoin-snapcake/Cargo.toml @@ -11,6 +11,7 @@ name = "snapcake" path = "src/main.rs" [dependencies] +bincode = { workspace = true } bitcoin = { workspace = true } clap = { workspace = true, features = ["derive"] } codec = { workspace = true } @@ -30,6 +31,7 @@ sc-network-sync = { workspace = true } sc-service = { workspace = true } sc-transaction-pool = { workspace = true } serde = { workspace = true } +sha2 = { workspace = true } sp-api = { workspace = true } sp-blockchain = { workspace = true } sp-consensus = { workspace = true } @@ -43,3 +45,4 @@ subcoin-service = { workspace = true } subcoin-utxo-snapshot = { workspace = true } tracing = { workspace = true } tokio = { workspace = true } +rocksdb = "0.21.0" diff --git a/crates/subcoin-snapcake/src/main.rs b/crates/subcoin-snapcake/src/main.rs index 6fc06c19..3d6c1da8 100644 --- a/crates/subcoin-snapcake/src/main.rs +++ b/crates/subcoin-snapcake/src/main.rs @@ -35,6 +35,7 @@ mod cli; mod params; +mod snapshot_manager; mod state_sync_wrapper; mod syncing_strategy; diff --git a/crates/subcoin-snapcake/src/snapshot_manager.rs b/crates/subcoin-snapcake/src/snapshot_manager.rs new file mode 100644 index 00000000..5e1c213b --- /dev/null +++ b/crates/subcoin-snapcake/src/snapshot_manager.rs @@ -0,0 +1,392 @@ +use bitcoin::hashes::Hash; +use bitcoin::{BlockHash, Txid}; +use rocksdb::DB; +use sha2::{Digest, Sha256}; +use std::fs::File; +use std::io::{BufReader, Read}; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; +use subcoin_runtime_primitives::Coin; +use subcoin_utxo_snapshot::{OutputEntry, Utxo, UtxoSnapshotGenerator}; + +const COUNT_KEY: &[u8; 12] = b"__coin_count"; +const INTERVAL: Duration = Duration::from_secs(5); + +const MAINNET_840000_SNAPSHOT_SHA256SUM: &str = + "dc4bb43d58d6a25e91eae93eb052d72e3318bd98ec62a5d0c11817cefbba177b"; + +/// Storage backend for UTXO snapshots. +/// +/// Note that [`SnapshotStore::InMem`] and [`SnapshotStore::Csv`] can not be used in production as +/// they may consume huge RAM up to 30+ GB. +enum SnapshotStore { + // Initially created for local testing. + #[allow(unused)] + InMem(Vec), + /// Human readable format. + /// + /// This format worked before Bitcoin Core 28. The snapshot format has been changed since + /// Bitcoin Core 28, but we still keep this format for the human readable property. + #[allow(unused)] + Csv(PathBuf), + // Store the coins in lexicographical order. + Rocksdb(DB), +} + +impl SnapshotStore { + /// Append a UTXO to the store. + fn push(&mut self, utxo: Utxo) -> std::io::Result<()> { + match self { + Self::InMem(list) => { + list.push(utxo); + Ok(()) + } + Self::Csv(path) => Self::append_to_csv(path, utxo), + Self::Rocksdb(db) => Self::insert_to_rocksdb(db, utxo).map_err(|err| { + std::io::Error::new(std::io::ErrorKind::Other, format!("rocksdb: {err:?}")) + }), + } + } + + fn append_to_csv(path: &Path, utxo: Utxo) -> std::io::Result<()> { + // Open the file in append mode each time to avoid keeping it open across calls + let file = std::fs::OpenOptions::new().append(true).open(path)?; + + let mut wtr = csv::WriterBuilder::new() + .has_headers(false) // Disable automatic header writing + .from_writer(file); + + let utxo_csv_entry = UtxoCsvEntry::from(utxo); + if let Err(e) = wtr.serialize(&utxo_csv_entry) { + panic!("Failed to write UTXO entry to CSV: {e}"); + } + + wtr.flush()?; + + Ok(()) + } + + fn insert_to_rocksdb(db: &DB, utxo: Utxo) -> Result<(), rocksdb::Error> { + let mut coin_count = + Self::read_rocksdb_count(db).expect("Failed to read count from Rocksdb") as u64; + + let Utxo { txid, vout, coin } = utxo; + + let mut key = Vec::with_capacity(32 + 4); + key.extend(txid.to_byte_array()); // Raw bytes of Txid + key.extend(vout.to_be_bytes()); // Ensure vout is big-endian for consistent ordering + + let value = bincode::serialize(&coin).expect("Failed to serialize Coin"); // Serialize coin data + + coin_count += 1; + + db.put(&key, value)?; + db.put(COUNT_KEY, coin_count.to_le_bytes())?; + + Ok(()) + } + + /// Count total UTXO entries in the store. + fn count(&self) -> std::io::Result { + match self { + Self::InMem(list) => Ok(list.len()), + Self::Csv(path) => { + // Open the file in read mode only for counting + let file = std::fs::File::open(path)?; + let reader = std::io::BufReader::new(file); + + // Count lines by reading through each record + let count = csv::ReaderBuilder::new() + .has_headers(false) + .from_reader(reader) + .records() + .count(); + + Ok(count) + } + Self::Rocksdb(db) => Self::read_rocksdb_count(db), + } + } + + /// Read the RocksDB UTXO count. + fn read_rocksdb_count(db: &DB) -> std::io::Result { + Ok(db + .get(COUNT_KEY) + .ok() + .flatten() + .map(|v| u64::from_le_bytes(v.try_into().expect("Invalid DB value"))) + .unwrap_or(0) as usize) + } + + fn generate_snapshot( + &mut self, + snapshot_generator: &mut UtxoSnapshotGenerator, + target_bitcoin_block_hash: BlockHash, + utxos_count: u64, + ) -> std::io::Result<()> { + match self { + Self::InMem(list) => snapshot_generator.generate_snapshot_in_mem( + target_bitcoin_block_hash, + utxos_count as u64, + std::mem::take(list), + ), + Self::Csv(path) => Self::generate_from_csv( + path, + snapshot_generator, + target_bitcoin_block_hash, + utxos_count, + ), + Self::Rocksdb(db) => Self::generate_from_rocksdb( + db, + snapshot_generator, + target_bitcoin_block_hash, + utxos_count, + ), + } + } + + fn generate_from_csv( + path: impl AsRef, + snapshot_generator: &mut UtxoSnapshotGenerator, + block_hash: BlockHash, + utxos_count: u64, + ) -> std::io::Result<()> { + let file = std::fs::File::open(path)?; + + let reader = std::io::BufReader::new(file); + let csv_reader = csv::ReaderBuilder::new() + .has_headers(false) + .from_reader(reader); + + let utxo_iter = csv_reader + .into_deserialize::() + .filter_map(Result::ok) + .map(Utxo::from); + + snapshot_generator.generate_snapshot_in_mem(block_hash, utxos_count, utxo_iter) + } + + fn generate_from_rocksdb( + db: &DB, + snapshot_generator: &mut UtxoSnapshotGenerator, + bitcoin_block_hash: BlockHash, + utxos_count: u64, + ) -> std::io::Result<()> { + let mut last_txid = None; + let mut coins = Vec::new(); + let mut written = 0; + let mut last_progress_update_time = Instant::now(); + + snapshot_generator.write_metadata(bitcoin_block_hash, utxos_count)?; + + for entry in db.iterator(rocksdb::IteratorMode::Start) { + let (key, value) = entry.unwrap(); + + if key.as_ref() == COUNT_KEY { + continue; // Skip the count key + } + + let txid = Txid::from_slice(&key[0..32]).unwrap(); // First 32 bytes + let vout = u32::from_be_bytes(key[32..36].try_into().unwrap()); // Next 4 bytes + let coin: Coin = bincode::deserialize(&value).unwrap(); + + match last_txid { + Some(x) => { + if x == txid { + coins.push(OutputEntry { vout, coin }); + } else { + let coins_per_txid = std::mem::take(&mut coins); + written += coins_per_txid.len(); + snapshot_generator.write_coins(x, coins_per_txid)?; + + if last_progress_update_time.elapsed() > INTERVAL { + tracing::info!( + target: "snapcake", + "Writing snapshot progress: {written}/{utxos_count}", + ); + last_progress_update_time = Instant::now(); + } + + last_txid.replace(txid); + coins.push(OutputEntry { vout, coin }); + } + } + None => { + last_txid.replace(txid); + coins.push(OutputEntry { vout, coin }); + } + } + } + + if let Some(txid) = last_txid { + if !coins.is_empty() { + written += coins.len(); + snapshot_generator.write_coins(txid, coins)?; + tracing::info!( + target: "snapcake", + "Writing snapshot progress: {written}/{utxos_count}", + ); + } + } + + Ok(()) + } +} + +/// CSV representation of a UTXO entry. +#[derive(serde::Serialize, serde::Deserialize)] +struct UtxoCsvEntry { + txid: bitcoin::Txid, + vout: u32, + is_coinbase: bool, + amount: u64, + height: u32, + script_pubkey: String, +} + +impl From for UtxoCsvEntry { + fn from(utxo: Utxo) -> Self { + let Utxo { txid, vout, coin } = utxo; + Self { + txid, + vout, + is_coinbase: coin.is_coinbase, + amount: coin.amount, + height: coin.height, + script_pubkey: hex::encode(&coin.script_pubkey), + } + } +} + +impl From for Utxo { + fn from(csv_entry: UtxoCsvEntry) -> Self { + let UtxoCsvEntry { + txid, + vout, + is_coinbase, + amount, + height, + script_pubkey, + } = csv_entry; + + let coin = Coin { + is_coinbase, + amount, + height, + script_pubkey: hex::decode(script_pubkey).expect("Failed to decode script_pubkey"), + }; + + Self { txid, vout, coin } + } +} + +pub struct SnapshotManager { + store: SnapshotStore, + snapshot_generator: UtxoSnapshotGenerator, + target_bitcoin_block_hash: BlockHash, +} + +impl SnapshotManager { + pub fn new( + target_block_number: u32, + target_bitcoin_block_hash: BlockHash, + snapshot_base_dir: PathBuf, + use_rocksdb: bool, + ) -> Self { + let sync_target = format!("{target_block_number}_{target_bitcoin_block_hash}"); + let snapshot_dir = snapshot_base_dir.join(sync_target); + + // Ensure the snapshot directory exists, creating it if necessary + std::fs::create_dir_all(&snapshot_dir).expect("Failed to create snapshot directory"); + + let snapshot_filepath = snapshot_dir.join("snapshot.dat"); + let snapshot_file = + std::fs::File::create(&snapshot_filepath).expect("Failed to create output file"); + let snapshot_generator = + UtxoSnapshotGenerator::new(snapshot_filepath, snapshot_file, bitcoin::Network::Bitcoin); + + let store = if use_rocksdb { + let db = DB::open_default(snapshot_dir.join("db")).expect("Failed to open Rocksdb"); + SnapshotStore::Rocksdb(db) + } else { + let utxo_filepath = snapshot_dir.join("utxo.csv"); + + // Open in write mode to clear existing content, then immediately close. + std::fs::File::create(&utxo_filepath).expect("Failed to clear utxo.csv"); + SnapshotStore::Csv(utxo_filepath) + }; + + Self { + store, + snapshot_generator, + target_bitcoin_block_hash, + } + } + + /// Adds a UTXO to the store. + pub fn store_utxo(&mut self, utxo: Utxo) { + self.store + .push(utxo) + .expect("Failed to add UTXO to the store"); + } + + /// Generates a snapshot and ensures the total UTXO count matches the expected count. + pub fn create_snapshot(&mut self, expected_utxos_count: usize) { + let utxos_count = self + .store + .count() + .expect("Failed to calculate the count of stored UTXO"); + + assert_eq!(utxos_count, expected_utxos_count, "UTXO count mismatches"); + + tracing::info!( + target: "snapcake", + "Writing the snapshot to {}", + self.snapshot_generator.path().display() + ); + + self.store + .generate_snapshot( + &mut self.snapshot_generator, + self.target_bitcoin_block_hash, + utxos_count as u64, + ) + .expect("Failed to write UTXO set snapshot"); + + if self.target_bitcoin_block_hash + == "0000000000000000000320283a032748cef8227873ff4872689bf23f1cda83a5" + .parse() + .expect("BlockHash for mainnet block#8400000 is correct; qed") + { + let sha256sum = calculate_sha256sum(self.snapshot_generator.path()) + .expect("Failed to calculate sha256sum of snapshot"); + + if sha256sum != MAINNET_840000_SNAPSHOT_SHA256SUM { + panic!("Invalid snapshot sha256sum for block 8400000, expected: {MAINNET_840000_SNAPSHOT_SHA256SUM}, got: {sha256sum}"); + } + } + } +} + +/// Calculates the SHA256 checksum of a file. +fn calculate_sha256sum(file_path: impl AsRef) -> std::io::Result { + // Open the file + let file = File::open(file_path)?; + let mut reader = BufReader::new(file); + + // Create a SHA256 hasher + let mut hasher = Sha256::new(); + let mut buffer = [0u8; 4096]; // Read the file in chunks + + // Read file and update the hasher + while let Ok(bytes_read) = reader.read(&mut buffer) { + if bytes_read == 0 { + break; // End of file + } + hasher.update(&buffer[..bytes_read]); + } + + // Finalize the hash and return it as a hexadecimal string + let hash_result = hasher.finalize(); + Ok(format!("{:x}", hash_result)) +} diff --git a/crates/subcoin-snapcake/src/state_sync_wrapper.rs b/crates/subcoin-snapcake/src/state_sync_wrapper.rs index 9b47c37e..45325458 100644 --- a/crates/subcoin-snapcake/src/state_sync_wrapper.rs +++ b/crates/subcoin-snapcake/src/state_sync_wrapper.rs @@ -1,3 +1,4 @@ +use crate::snapshot_manager::SnapshotManager; use codec::Decode; use sc_client_api::ProofProvider; use sc_network_sync::strategy::state_sync::{ @@ -11,119 +12,17 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use subcoin_crypto::muhash::MuHash3072; use subcoin_runtime_primitives::Coin; -use subcoin_utxo_snapshot::{Utxo, UtxoSnapshotGenerator}; +use subcoin_utxo_snapshot::Utxo; const DOWNLOAD_PROGRESS_LOG_INTERVAL: Duration = Duration::from_secs(5); -#[derive(serde::Serialize, serde::Deserialize)] -struct UtxoCsvEntry { - txid: bitcoin::Txid, - vout: u32, - is_coinbase: bool, - amount: u64, - height: u32, - script_pubkey: String, -} - -impl From for UtxoCsvEntry { - fn from(utxo: Utxo) -> Self { - let Utxo { txid, vout, coin } = utxo; - Self { - txid, - vout, - is_coinbase: coin.is_coinbase, - amount: coin.amount, - height: coin.height, - script_pubkey: hex::encode(&coin.script_pubkey), - } - } -} - -impl From for Utxo { - fn from(csv_entry: UtxoCsvEntry) -> Self { - let UtxoCsvEntry { - txid, - vout, - is_coinbase, - amount, - height, - script_pubkey, - } = csv_entry; - - let coin = Coin { - is_coinbase, - amount, - height, - script_pubkey: hex::decode(script_pubkey).expect("Failed to decode script_pubkey"), - }; - - Self { txid, vout, coin } - } -} - -enum UtxoStore { - // Initially created for local testing. - #[allow(unused)] - InMem(Vec), - Csv(PathBuf), -} - -impl UtxoStore { - fn push(&mut self, utxo: Utxo) { - match self { - Self::InMem(list) => list.push(utxo), - Self::Csv(path) => { - // Open the file in append mode each time to avoid keeping it open across calls - let file = std::fs::OpenOptions::new() - .append(true) - .open(path) - .expect("Failed to open file"); - - let mut wtr = csv::WriterBuilder::new() - .has_headers(false) // Disable automatic header writing - .from_writer(file); - - let utxo_csv_entry = UtxoCsvEntry::from(utxo); - if let Err(e) = wtr.serialize(&utxo_csv_entry) { - panic!("Failed to write UTXO entry to CSV: {}", e); - } - - if let Err(e) = wtr.flush() { - panic!("Failed to flush CSV writer: {}", e); - } - } - } - } - - fn count(&self) -> std::io::Result { - match self { - Self::InMem(list) => Ok(list.len()), - Self::Csv(path) => { - // Open the file in read mode only for counting - let file = std::fs::File::open(path)?; - let reader = std::io::BufReader::new(file); - - // Count lines by reading through each record - let count = csv::ReaderBuilder::new() - .has_headers(false) - .from_reader(reader) - .records() - .count(); - - Ok(count) - } - } - } -} - /// Wrapped [`StateSync`] to intercept the state response for parsing the UTXO state. pub(crate) struct StateSyncWrapper { inner: StateSync, muhash: MuHash3072, target_block_number: u32, target_bitcoin_block_hash: bitcoin::BlockHash, - utxo_store: UtxoStore, - snapshot_generator: UtxoSnapshotGenerator, + snapshot_manager: SnapshotManager, received_coins: usize, total_coins: usize, last_progress_print_time: Option, @@ -146,29 +45,19 @@ where subcoin_primitives::extract_bitcoin_block_hash::(&target_header) .expect("Failed to extract bitcoin block hash"); - let sync_target = format!("{target_block_number}_{target_bitcoin_block_hash}"); - let snapshot_dir = snapshot_base_dir.join(sync_target); - - // Ensure the snapshot directory exists, creating it if necessary - std::fs::create_dir_all(&snapshot_dir).expect("Failed to create snapshot directory"); - - let snapshot_filepath = snapshot_dir.join("snapshot.dat"); - let snapshot_file = - std::fs::File::create(&snapshot_filepath).expect("Failed to create output file"); - let snapshot_generator = UtxoSnapshotGenerator::new(snapshot_filepath, snapshot_file); - - let utxo_filepath = snapshot_dir.join("utxo.csv"); - - // Open in write mode to clear existing content, then immediately close. - std::fs::File::create(&utxo_filepath).expect("Failed to clear utxo.csv"); + let snapshot_manager = SnapshotManager::new( + target_block_number.saturated_into(), + target_bitcoin_block_hash, + snapshot_base_dir, + true, + ); Self { inner: StateSync::new(client, target_header, None, None, skip_proof), target_block_number: target_block_number.saturated_into(), target_bitcoin_block_hash, muhash: MuHash3072::new(), - utxo_store: UtxoStore::Csv(utxo_filepath), - snapshot_generator, + snapshot_manager, received_coins: 0, total_coins, last_progress_print_time: None, @@ -207,8 +96,8 @@ where self.muhash.insert(&data); - // TODO: write UTXO to a local file instead of storing in memory. - self.utxo_store.push(Utxo { txid, vout, coin }); + self.snapshot_manager.store_utxo(Utxo { txid, vout, coin }); + self.received_coins += 1; } } @@ -225,13 +114,6 @@ where if complete { let muhash = self.muhash.txoutset_muhash(); - let utxos_count = self - .utxo_store - .count() - .expect("Failed to calculate the count of stored UTXO"); - - assert_eq!(utxos_count, self.total_coins, "UTXO count mismatches"); - tracing::info!( target: "snapcake", %muhash, @@ -240,46 +122,7 @@ where self.total_coins, ); - tracing::info!( - target: "snapcake", - "Writing the snapshot to {}", - self.snapshot_generator.path().display() - ); - - match &mut self.utxo_store { - UtxoStore::InMem(list) => { - let utxos = std::mem::take(list); - - self.snapshot_generator - .write_utxo_snapshot( - self.target_bitcoin_block_hash, - utxos_count as u64, - utxos, - ) - .expect("Failed to write UTXO set snapshot"); - } - UtxoStore::Csv(path) => { - let file = std::fs::File::open(path).expect("Failed to open utxo.csv"); - - let reader = std::io::BufReader::new(file); - let csv_reader = csv::ReaderBuilder::new() - .has_headers(false) - .from_reader(reader); - - let utxo_iter = csv_reader - .into_deserialize::() - .filter_map(Result::ok) - .map(Utxo::from); - - self.snapshot_generator - .write_utxo_snapshot( - self.target_bitcoin_block_hash, - utxos_count as u64, - utxo_iter, - ) - .expect("Failed to write UTXO set snapshot"); - } - } + self.snapshot_manager.create_snapshot(self.total_coins); tracing::info!( target: "snapcake", diff --git a/crates/subcoin-snapcake/src/syncing_strategy.rs b/crates/subcoin-snapcake/src/syncing_strategy.rs index 6e3e9165..b3544100 100644 --- a/crates/subcoin-snapcake/src/syncing_strategy.rs +++ b/crates/subcoin-snapcake/src/syncing_strategy.rs @@ -729,7 +729,7 @@ where for (peer_id, request) in self.pending_header_requests.clone() { chain_sync_actions.push( self.chain_sync - .as_ref() + .as_mut() .expect("Chain sync must be available") .create_block_request_action(peer_id, request), ); diff --git a/crates/subcoin-utxo-snapshot/Cargo.toml b/crates/subcoin-utxo-snapshot/Cargo.toml index d183bca2..12ace08d 100644 --- a/crates/subcoin-utxo-snapshot/Cargo.toml +++ b/crates/subcoin-utxo-snapshot/Cargo.toml @@ -7,7 +7,11 @@ repository.workspace = true license.workspace = true [dependencies] -bitcoin = { workspace = true } +bitcoin = { workspace = true, features = ["serde"] } serde = { workspace = true } subcoin-primitives = { workspace = true } txoutset = { workspace = true } + +[dev-dependencies] +fastrand = { workspace = true } +hex = { workspace = true } diff --git a/crates/subcoin-utxo-snapshot/src/compressor.rs b/crates/subcoin-utxo-snapshot/src/compressor.rs new file mode 100644 index 00000000..f31455f4 --- /dev/null +++ b/crates/subcoin-utxo-snapshot/src/compressor.rs @@ -0,0 +1,248 @@ +use bitcoin::consensus::Encodable; +use std::io::Read; +use std::vec::Vec; +use txoutset::var_int::VarInt; + +const MAX_MONEY: u64 = 21000000 * 100000000; + +// Constants for opcodes +const OP_DUP: u8 = 0x76; +const OP_HASH160: u8 = 0xa9; +const OP_EQUALVERIFY: u8 = 0x88; +const OP_CHECKSIG: u8 = 0xac; +const OP_EQUAL: u8 = 0x87; + +// https://github.com/bitcoin/bitcoin/blob/0903ce8dbc25d3823b03d52f6e6bff74d19e801e/src/compressor.cpp#L140 +// +// NOTE: This function is defined only for 0 <= n <= MAX_MONEY. +pub fn compress_amount(n: u64) -> u64 { + assert!(n <= MAX_MONEY); + + if n == 0 { + return 0; + } + let mut e = 0; + let mut n = n; + while n % 10 == 0 && e < 9 { + n /= 10; + e += 1; + } + if e < 9 { + let d = n % 10; + assert!((1..=9).contains(&d)); + n /= 10; + 1 + (n * 9 + d - 1) * 10 + e as u64 + } else { + 1 + (n - 1) * 10 + 9 + } +} + +#[allow(unused)] +pub fn decompress_amount(x: u64) -> u64 { + if x == 0 { + return 0; + } + let mut x = x - 1; + let e = x % 10; + x /= 10; + let mut n = if e < 9 { + let d = (x % 9) + 1; + x /= 9; + x * 10 + d + } else { + x + 1 + }; + for _ in 0..e { + n *= 10; + } + n +} + +fn to_key_id(script: &[u8]) -> Option<[u8; 20]> { + if script.len() == 25 + && script[0] == OP_DUP + && script[1] == OP_HASH160 + && script[2] == 20 + && script[23] == OP_EQUALVERIFY + && script[24] == OP_CHECKSIG + { + Some(script[3..23].try_into().expect("Size must be 20; qed")) + } else { + None + } +} + +fn to_script_id(script: &[u8]) -> Option<[u8; 20]> { + if script.len() == 23 && script[0] == OP_HASH160 && script[1] == 20 && script[22] == OP_EQUAL { + Some(script[2..22].try_into().expect("Size must be 20; qed")) + } else { + None + } +} + +enum PublicKey { + Compressed([u8; 33]), + Uncompressed([u8; 65]), +} + +fn to_pub_key(script: &[u8]) -> Option { + if script.len() == 35 + && script[0] == 33 + && script[34] == OP_CHECKSIG + && (script[1] == 0x02 || script[1] == 0x03) + { + Some(PublicKey::Compressed( + script[1..34].try_into().expect("Size must be 33; qed"), + )) + } else if script.len() == 67 + && script[0] == 65 + && script[66] == OP_CHECKSIG + && script[1] == 0x04 + { + // If not fully valid, it would not be compressible. + let is_fully_valid = bitcoin::Script::from_bytes(script) + .p2pk_public_key() + .is_some(); + if is_fully_valid { + Some(PublicKey::Uncompressed( + script[1..66].try_into().expect("Size be 65; qed"), + )) + } else { + None + } + } else { + None + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CompressedScript(pub Vec); + +pub fn compress_script(script: &[u8]) -> Option { + if let Some(hash) = to_key_id(script) { + let mut out = Vec::with_capacity(21); + out.push(0x00); + out.extend(hash); + return Some(CompressedScript(out)); + } + + if let Some(hash) = to_script_id(script) { + let mut out = Vec::with_capacity(21); + out.push(0x01); + out.extend(hash); + return Some(CompressedScript(out)); + } + + if let Some(public_key) = to_pub_key(script) { + let mut out = Vec::with_capacity(33); + + match public_key { + PublicKey::Compressed(compressed) => { + out.extend(compressed); + } + PublicKey::Uncompressed(uncompressed) => { + out.push(0x04 | (uncompressed[64] & 0x01)); + out.extend_from_slice(&uncompressed[1..33]); + } + } + + return Some(CompressedScript(out)); + } + + None +} + +#[allow(unused)] +fn decompress_script(stream: &mut R) -> std::io::Result> { + let mut n_size_buf = [0u8; 1]; + stream.read_exact(&mut n_size_buf)?; + let n_size = n_size_buf[0]; + + match n_size { + 0x00 => { + // P2PKH + let mut data = [0u8; 20]; + stream.read_exact(&mut data)?; + + let bytes = vec![OP_DUP, OP_HASH160, 20] + .into_iter() + .chain(data.iter().cloned()) + .chain(vec![OP_EQUALVERIFY, OP_CHECKSIG]) + .collect(); + Ok(Some(bitcoin::ScriptBuf::from_bytes(bytes))) + } + 0x01 => { + // P2SH + let mut data = [0u8; 20]; + stream.read_exact(&mut data)?; + + let bytes = vec![OP_HASH160, 20] + .into_iter() + .chain(data.iter().cloned()) + .chain(vec![OP_EQUAL]) + .collect(); + Ok(Some(bitcoin::ScriptBuf::from_bytes(bytes))) + } + 0x02 | 0x03 => { + // Compressed PubKey + let mut data = [0u8; 32]; + stream.read_exact(&mut data)?; + + let mut bytes = Vec::new(); + bytes.push(33); // Key length + bytes.push(n_size); // Prefix + bytes.extend_from_slice(&data); + bytes.push(OP_CHECKSIG); // OP_CHECKSIG + Ok(Some(bitcoin::ScriptBuf::from_bytes(bytes))) + } + 0x04 | 0x05 => { + let mut compressed_pubkey = [0u8; 33]; + compressed_pubkey[0] = n_size - 2; + stream.read_exact(&mut compressed_pubkey[1..])?; + + let Ok(pubkey) = bitcoin::PublicKey::from_slice(&compressed_pubkey) else { + return Ok(None); + }; + + // Uncompressed PubKey + let uncompressed = pubkey.inner.serialize_uncompressed(); + + let mut bytes = Vec::new(); + bytes.push(65); // PubKey length + bytes.extend(uncompressed); + bytes.push(0xac); // OP_CHECKSIG + Ok(Some(bitcoin::ScriptBuf::from_bytes(bytes))) + } + _ => Ok(None), + } +} + +pub struct ScriptCompression(pub Vec); + +impl ScriptCompression { + const SPECIAL_SCRIPTS: usize = 6; + + pub fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + if let Some(compressed_script) = compress_script(&self.0) { + writer.write_all(&compressed_script.0)?; + return Ok(()); + } + let size = self.0.len() + Self::SPECIAL_SCRIPTS; + let mut data = Vec::new(); + VarInt::new(size as u64).consensus_encode(&mut data)?; + writer.write_all(&data)?; + writer.write_all(&self.0)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compress_amount() { + let n = fastrand::u64(..MAX_MONEY); + assert_eq!(n, decompress_amount(compress_amount(n))); + } +} diff --git a/crates/subcoin-utxo-snapshot/src/lib.rs b/crates/subcoin-utxo-snapshot/src/lib.rs index d877a7c2..4f682d48 100644 --- a/crates/subcoin-utxo-snapshot/src/lib.rs +++ b/crates/subcoin-utxo-snapshot/src/lib.rs @@ -1,9 +1,42 @@ +mod compressor; +mod script; +mod serialize; +#[cfg(test)] +mod tests; + +use self::compressor::ScriptCompression; +use self::serialize::write_compact_size; use bitcoin::consensus::encode::Encodable; +use bitcoin::hashes::Hash; use bitcoin::BlockHash; +use compressor::compress_amount; +use std::collections::{BTreeMap, HashSet}; use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; use subcoin_primitives::runtime::Coin; +use txoutset::var_int::VarInt; + +const SNAPSHOT_MAGIC_BYTES: [u8; 5] = [b'u', b't', b'x', b'o', 0xff]; + +/// Groups UTXOs by `txid` into a lexicographically ordered `BTreeMap` (same as the order stored by +/// Bitcoin Core in leveldb). +/// +/// NOTE: this requires substantial RAM. +pub fn group_utxos_by_txid( + utxos: impl IntoIterator, +) -> BTreeMap> { + let mut map: BTreeMap> = BTreeMap::new(); + + for utxo in utxos { + map.entry(utxo.txid).or_default().push(OutputEntry { + vout: utxo.vout, + coin: utxo.coin, + }); + } + + map +} // Equivalent function in Rust for serializing an OutPoint and Coin // @@ -29,6 +62,17 @@ pub fn tx_out_ser(outpoint: bitcoin::OutPoint, coin: &Coin) -> bitcoin::io::Resu Ok(data) } +/// Represents a UTXO output in the snapshot format. +/// +/// A combination of the output index (vout) and associated coin data. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct OutputEntry { + /// The output index within the transaction. + pub vout: u32, + /// The coin data associated with this output. + pub coin: Coin, +} + /// Represents a single UTXO (Unspent Transaction Output) in Bitcoin. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Utxo { @@ -40,22 +84,115 @@ pub struct Utxo { pub coin: Coin, } +impl From<(bitcoin::Txid, u32, Coin)> for Utxo { + fn from((txid, vout, coin): (bitcoin::Txid, u32, Coin)) -> Self { + Self { txid, vout, coin } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct SnapshotMetadata { + version: u16, + supported_versions: HashSet, + network_magic: [u8; 4], + base_blockhash: [u8; 32], + coins_count: u64, +} + +impl SnapshotMetadata { + const VERSION: u16 = 2; + + pub fn new(network_magic: [u8; 4], base_blockhash: [u8; 32], coins_count: u64) -> Self { + let supported_versions = HashSet::from([Self::VERSION]); + Self { + version: Self::VERSION, + supported_versions, + network_magic, + base_blockhash, + coins_count, + } + } + + pub fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + writer.write_all(&SNAPSHOT_MAGIC_BYTES)?; + writer.write_all(&self.version.to_le_bytes())?; + writer.write_all(&self.network_magic)?; + writer.write_all(&self.base_blockhash)?; + writer.write_all(&self.coins_count.to_le_bytes())?; + Ok(()) + } + + #[allow(unused)] + pub fn deserialize( + reader: &mut R, + expected_network_magic: &[u8], + ) -> std::io::Result { + use std::io::{Error, ErrorKind}; + + let mut magic_bytes = [0; SNAPSHOT_MAGIC_BYTES.len()]; + reader.read_exact(&mut magic_bytes)?; + if magic_bytes != SNAPSHOT_MAGIC_BYTES { + return Err(Error::new( + ErrorKind::InvalidData, + format!("Invalid UTXO snapshot magic bytes (expected: {SNAPSHOT_MAGIC_BYTES:?}, got: {magic_bytes:?})"), + )); + } + + let mut version_bytes = [0; 2]; + reader.read_exact(&mut version_bytes)?; + let version = u16::from_le_bytes(version_bytes); + + let supported_versions = HashSet::from([Self::VERSION]); + if !supported_versions.contains(&version) { + return Err(Error::new( + ErrorKind::InvalidData, + format!("Unsupported snapshot version: {version}"), + )); + } + + let mut network_magic = [0u8; 4]; + reader.read_exact(&mut network_magic)?; + if network_magic != expected_network_magic { + return Err(Error::new(ErrorKind::InvalidData, "Network magic mismatch")); + } + + let mut base_blockhash = [0; 32]; + reader.read_exact(&mut base_blockhash)?; + + let mut coins_count_bytes = [0; 8]; + reader.read_exact(&mut coins_count_bytes)?; + let coins_count = u64::from_le_bytes(coins_count_bytes); + + Ok(Self { + version, + supported_versions, + network_magic, + base_blockhash, + coins_count, + }) + } +} + /// Responsible for dumping the UTXO set snapshot compatible with Bitcoin Core. +/// +/// The format of generated snapshot is compatible with Bitcoin Core 28.0. pub struct UtxoSnapshotGenerator { output_filepath: PathBuf, output_file: File, + network: bitcoin::Network, } impl UtxoSnapshotGenerator { /// Constructs a new instance of [`UtxoSnapshotGenerator`]. - pub fn new(output_filepath: PathBuf, output_file: File) -> Self { + pub fn new(output_filepath: PathBuf, output_file: File, network: bitcoin::Network) -> Self { Self { output_filepath, output_file, + network, } } - /// Returns the path of output file. + /// Returns the path of generated snapshot file. pub fn path(&self) -> &Path { &self.output_filepath } @@ -97,39 +234,121 @@ impl UtxoSnapshotGenerator { } /// Writes the metadata of snapshot. - pub fn write_snapshot_metadata( + pub fn write_metadata( &mut self, bitcoin_block_hash: BlockHash, coins_count: u64, ) -> std::io::Result<()> { - let mut data = Vec::new(); - - bitcoin_block_hash - .consensus_encode(&mut data) - .expect("Failed to encode"); - - coins_count - .consensus_encode(&mut data) - .expect("Failed to write utxo set size"); - - let _ = self.output_file.write(data.as_slice())?; - - Ok(()) + write_snapshot_metadata( + &mut self.output_file, + self.network, + bitcoin_block_hash, + coins_count, + ) } /// Write the UTXO snapshot at the specified block to a file. - pub fn write_utxo_snapshot( + /// + /// NOTE: Do not use it in production. + pub fn generate_snapshot_in_mem( &mut self, bitcoin_block_hash: BlockHash, utxos_count: u64, utxos: impl IntoIterator, ) -> std::io::Result<()> { - self.write_snapshot_metadata(bitcoin_block_hash, utxos_count)?; + generate_snapshot_in_mem_inner( + &mut self.output_file, + self.network, + bitcoin_block_hash, + utxos_count, + utxos, + ) + } - for Utxo { txid, vout, coin } in utxos { - self.write_utxo_entry(txid, vout, coin)?; - } + /// Writes UTXO entries for a given transaction. + pub fn write_coins( + &mut self, + txid: bitcoin::Txid, + coins: Vec, + ) -> std::io::Result<()> { + write_coins(&mut self.output_file, txid, coins) + } +} - Ok(()) +fn write_snapshot_metadata( + writer: &mut W, + network: bitcoin::Network, + bitcoin_block_hash: BlockHash, + coins_count: u64, +) -> std::io::Result<()> { + let snapshot_metadata = SnapshotMetadata::new( + network.magic().to_bytes(), + bitcoin_block_hash.to_byte_array(), + coins_count, + ); + + snapshot_metadata.serialize(writer)?; + + Ok(()) +} + +/// Write the UTXO snapshot at the specified block using the given writer. +/// +/// NOTE: Do not use it in production. +fn generate_snapshot_in_mem_inner( + writer: &mut W, + network: bitcoin::Network, + bitcoin_block_hash: BlockHash, + utxos_count: u64, + utxos: impl IntoIterator, +) -> std::io::Result<()> { + write_snapshot_metadata(writer, network, bitcoin_block_hash, utxos_count)?; + + for (txid, coins) in group_utxos_by_txid(utxos) { + write_coins(writer, txid, coins)?; + } + + Ok(()) +} + +pub fn write_coins( + writer: &mut W, + txid: bitcoin::Txid, + mut coins: Vec, +) -> std::io::Result<()> { + coins.sort_by_key(|output_entry| output_entry.vout); + + let mut data = Vec::new(); + txid.consensus_encode(&mut data)?; + writer.write_all(&data)?; + + write_compact_size(writer, coins.len() as u64)?; + + for OutputEntry { vout, coin } in coins { + write_compact_size(writer, vout as u64)?; + serialize_coin(writer, coin)?; } + + Ok(()) +} + +fn serialize_coin(writer: &mut W, coin: Coin) -> std::io::Result<()> { + let Coin { + is_coinbase, + amount, + height, + script_pubkey, + } = coin; + + // https://github.com/bitcoin/bitcoin/blob/0903ce8dbc25d3823b03d52f6e6bff74d19e801e/src/coins.h#L62 + let code = (height << 1) | is_coinbase as u32; + + let mut data = Vec::new(); + VarInt::new(code as u64).consensus_encode(&mut data)?; + VarInt::new(compress_amount(amount)).consensus_encode(&mut data)?; + writer.write_all(&data)?; + + ScriptCompression(script_pubkey).serialize(writer)?; + + Ok(()) } diff --git a/crates/subcoin-utxo-snapshot/src/script.rs b/crates/subcoin-utxo-snapshot/src/script.rs new file mode 100644 index 00000000..2d723a1c --- /dev/null +++ b/crates/subcoin-utxo-snapshot/src/script.rs @@ -0,0 +1,131 @@ +use crate::compressor::compress_script; +use bitcoin::consensus::encode::Error; +use bitcoin::consensus::{Decodable, Encodable}; +use bitcoin::hashes::Hash; +use bitcoin::script::{Builder, ScriptBuf}; +use bitcoin::{opcodes, PubkeyHash, PublicKey, ScriptHash}; +use txoutset::var_int::VarInt; + +const NUM_SPECIAL_SCRIPTS: usize = 6; +const MAX_SCRIPT_SIZE: usize = 10_000; + +/// Wrapper to enable script decompression +#[derive(Debug)] +pub struct ScriptCompression(ScriptBuf); + +impl Encodable for ScriptCompression { + fn consensus_encode( + &self, + writer: &mut W, + ) -> Result { + if let Some(compressed_script) = compress_script(self.0.as_bytes()) { + let len = compressed_script.0.len(); + writer.write_all(&compressed_script.0)?; + return Ok(len); + } + + let size = self.0.len() + NUM_SPECIAL_SCRIPTS; + let mut data = Vec::new(); + let mut len = VarInt::new(size as u64).consensus_encode(&mut data)?; + writer.write_all(&data)?; + + len += self.0.len(); + writer.write_all(self.0.as_bytes())?; + Ok(len) + } +} + +impl Decodable for ScriptCompression { + fn consensus_decode(reader: &mut R) -> Result { + let mut size = u64::from(VarInt::consensus_decode(reader)?) as usize; + + match size { + 0x00 => { + // P2PKH + let mut bytes = [0; 20]; + reader.read_exact(&mut bytes)?; + let pubkey_hash = PubkeyHash::from_slice(&bytes) + .map_err(|_| Error::ParseFailed("Failed to parse Hash160"))?; + Ok(Self(ScriptBuf::new_p2pkh(&pubkey_hash))) + } + 0x01 => { + // P2SH + let mut bytes = [0; 20]; + reader.read_exact(&mut bytes)?; + let script_hash = ScriptHash::from_slice(&bytes) + .map_err(|_| Error::ParseFailed("Failed to parse Hash160"))?; + Ok(Self(ScriptBuf::new_p2sh(&script_hash))) + } + 0x02 | 0x03 => { + // P2PK (compressed) + let mut bytes = [0; 32]; + reader.read_exact(&mut bytes)?; + + let mut script_bytes = Vec::with_capacity(35); + script_bytes.push(opcodes::all::OP_PUSHBYTES_33.to_u8()); + script_bytes.push(size as u8); + script_bytes.extend_from_slice(&bytes); + script_bytes.push(opcodes::all::OP_CHECKSIG.to_u8()); + + Ok(Self(ScriptBuf::from(script_bytes))) + } + 0x04 | 0x05 => { + // P2PK (uncompressed) + let mut bytes = [0; 32]; + reader.read_exact(&mut bytes)?; + + let mut compressed_pubkey_bytes = Vec::with_capacity(33); + compressed_pubkey_bytes.push((size - 2) as u8); + compressed_pubkey_bytes.extend_from_slice(&bytes); + + let compressed_pubkey = PublicKey::from_slice(&compressed_pubkey_bytes) + .map_err(|_| Error::ParseFailed("Failed to parse PublicKey"))?; + let inner_uncompressed = compressed_pubkey.inner.serialize_uncompressed(); + + let mut script_bytes = Vec::with_capacity(67); + script_bytes.push(opcodes::all::OP_PUSHBYTES_65.to_u8()); + script_bytes.extend_from_slice(&inner_uncompressed); + script_bytes.push(opcodes::all::OP_CHECKSIG.to_u8()); + + Ok(Self(ScriptBuf::from(script_bytes))) + } + _ => { + size -= NUM_SPECIAL_SCRIPTS; + let mut bytes = Vec::with_capacity(size); + bytes.resize_with(size, || 0); + if size > MAX_SCRIPT_SIZE { + reader.read_exact(&mut bytes)?; + let script = Builder::new() + .push_opcode(opcodes::all::OP_RETURN) + .into_script(); + Ok(Self(script)) + } else { + reader.read_exact(&mut bytes)?; + Ok(Self(ScriptBuf::from_bytes(bytes))) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_script_compression() { + let line = "7e27944eacef755d2fa83b6559f480941aa4f0e5f3f0623fc3d0de9cd28c0100:1,false,413198,7800,5121030b3810fd20fd3771517b2b8847d225791035ea06768e17c733a5756b6005bf55210222b6e887bb4d4bca08f97348e6b8561e6d11e0ed96dec0584b34d709078cd4a54104289699814d1c9ef35ae45cfb41116501c15b0141430a481226aa19bcb8806c7223802d24f2638d8ce14378137dd52114d1d965e2969b5b3ac011c25e2803eb5753ae"; + + let utxo = crate::tests::parse_csv_entry(&line); + let script = utxo.coin.script_pubkey; + + let script_compression = ScriptCompression(ScriptBuf::from_bytes(script.clone())); + + let mut encoded = Vec::new(); + script_compression.consensus_encode(&mut encoded).unwrap(); + + let decoded = ScriptCompression::consensus_decode(&mut encoded.as_slice()).unwrap(); + + assert_eq!(decoded.0, script_compression.0); + } +} diff --git a/crates/subcoin-utxo-snapshot/src/serialize.rs b/crates/subcoin-utxo-snapshot/src/serialize.rs new file mode 100644 index 00000000..1d725497 --- /dev/null +++ b/crates/subcoin-utxo-snapshot/src/serialize.rs @@ -0,0 +1,18 @@ +use std::io::{self, Write}; + +// https://github.com/bitcoin/bitcoin/blob/0903ce8dbc25d3823b03d52f6e6bff74d19e801e/src/serialize.h#L305 +pub fn write_compact_size(writer: &mut W, size: u64) -> io::Result<()> { + if size < 253 { + writer.write_all(&[size as u8])?; + } else if size <= 0xFFFF { + writer.write_all(&[253])?; + writer.write_all(&(size as u16).to_le_bytes())?; + } else if size <= 0xFFFF_FFFF { + writer.write_all(&[254])?; + writer.write_all(&(size as u32).to_le_bytes())?; + } else { + writer.write_all(&[255])?; + writer.write_all(&size.to_le_bytes())?; + } + Ok(()) +} diff --git a/crates/subcoin-utxo-snapshot/src/tests.rs b/crates/subcoin-utxo-snapshot/src/tests.rs new file mode 100644 index 00000000..bbfd840e --- /dev/null +++ b/crates/subcoin-utxo-snapshot/src/tests.rs @@ -0,0 +1,170 @@ +use super::*; + +pub(crate) fn parse_csv_entry(line: &str) -> Utxo { + let parts = line.split(',').collect::>(); + let (txid, vout) = parts[0].split_once(':').unwrap(); + let is_coinbase = parts[1] == "true"; + let height: u32 = parts[2].parse().unwrap(); + let amount: u64 = parts[3].parse().unwrap(); + let script_pubkey = hex::decode(parts[4].as_bytes()).unwrap(); + Utxo { + txid: txid.parse().unwrap(), + vout: vout.parse().unwrap(), + coin: Coin { + is_coinbase, + amount, + height, + script_pubkey, + }, + } +} + +fn print_hex_dump(data: &[u8]) { + for (i, chunk) in data.chunks(16).enumerate() { + // Print the offset + print!("{:08x} ", i * 16); + + // Print the hex values + for byte in chunk.iter() { + print!("{:02x} ", byte); + } + + // Add spacing if the line is not full + for _ in 0..(16 - chunk.len()) { + print!(" "); + } + + // Print the ASCII representation + print!(" |"); + for byte in chunk { + if byte.is_ascii_graphic() || *byte == b' ' { + print!("{}", *byte as char); + } else { + print!("."); + } + } + println!("|"); + } +} + +#[test] +fn test_snapshot_generation() { + let block_hash1: BlockHash = "00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048" + .parse() + .unwrap(); + let snapshot_metadata = SnapshotMetadata::new( + bitcoin::Network::Bitcoin.magic().to_bytes(), + block_hash1.to_byte_array(), + 1, + ); + let mut data = Vec::new(); + snapshot_metadata.serialize(&mut data).unwrap(); + + // Test data fetched via `./build/src/bitcoin-cli -datadir=$DIR -rpcclienttimeout=0 -named dumptxoutset 1_utxo.dat rollback=1` + #[rustfmt::skip] + assert_eq!( + data, + // Serialized metadata + vec![ + 0x75, 0x74, 0x78, 0x6f, 0xff, 0x02, 0x00, 0xf9, + 0xbe, 0xb4, 0xd9, 0x48, 0x60, 0xeb, 0x18, 0xbf, + 0x1b, 0x16, 0x20, 0xe3, 0x7e, 0x94, 0x90, 0xfc, + 0x8a, 0x42, 0x75, 0x14, 0x41, 0x6f, 0xd7, 0x51, + 0x59, 0xab, 0x86, 0x68, 0x8e, 0x9a, 0x83, 0x00, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00 + ] + ); + + let txid: bitcoin::Txid = "0e3e2357e806b6cdb1f70b54c3a3a17b6714ee1f0e68bebb44a74b1efd512098" + .parse() + .unwrap(); + + let mut data = Vec::new(); + txid.consensus_encode(&mut data).unwrap(); + + #[rustfmt::skip] + assert_eq!( + data, + // Serialized txid + [ + 0x98, 0x20, 0x51, 0xfd, 0x1e, + 0x4b, 0xa7, 0x44, 0xbb, 0xbe, 0x68, 0x0e, 0x1f, + 0xee, 0x14, 0x67, 0x7b, 0xa1, 0xa3, 0xc3, 0x54, + 0x0b, 0xf7, 0xb1, 0xcd, 0xb6, 0x06, 0xe8, 0x57, + 0x23, 0x3e, 0x0e + ] + ); + + let mut data = Vec::new(); + write_compact_size(&mut data, 1).unwrap(); + assert_eq!(data, [0x01]); + + let coin = Coin { + is_coinbase: true, + amount: 50_0000_0000, + height: 1, + script_pubkey: hex::decode("410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac").unwrap() + }; + + let utxos = vec![Utxo { + txid, + vout: 0, + coin, + }]; + + let mut data = Vec::new(); + generate_snapshot_in_mem_inner(&mut data, bitcoin::Network::Bitcoin, block_hash1, 1, utxos) + .unwrap(); + + #[rustfmt::skip] + assert_eq!( + data, + [ + 0x75, 0x74, 0x78, 0x6f, 0xff, 0x02, 0x00, 0xf9, 0xbe, 0xb4, 0xd9, 0x48, 0x60, 0xeb, 0x18, 0xbf, + 0x1b, 0x16, 0x20, 0xe3, 0x7e, 0x94, 0x90, 0xfc, 0x8a, 0x42, 0x75, 0x14, 0x41, 0x6f, 0xd7, 0x51, + 0x59, 0xab, 0x86, 0x68, 0x8e, 0x9a, 0x83, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x98, 0x20, 0x51, 0xfd, 0x1e, 0x4b, 0xa7, 0x44, 0xbb, 0xbe, 0x68, 0x0e, 0x1f, + 0xee, 0x14, 0x67, 0x7b, 0xa1, 0xa3, 0xc3, 0x54, 0x0b, 0xf7, 0xb1, 0xcd, 0xb6, 0x06, 0xe8, 0x57, + 0x23, 0x3e, 0x0e, 0x01, 0x00, 0x03, 0x32, 0x04, 0x96, 0xb5, 0x38, 0xe8, 0x53, 0x51, 0x9c, 0x72, + 0x6a, 0x2c, 0x91, 0xe6, 0x1e, 0xc1, 0x16, 0x00, 0xae, 0x13, 0x90, 0x81, 0x3a, 0x62, 0x7c, 0x66, + 0xfb, 0x8b, 0xe7, 0x94, 0x7b, 0xe6, 0x3c, 0x52, + ] + ); +} + +#[test] +fn test_snapshot_at_block_6() { + // subcoin blockchain dumptxoutset --height 6 + let lines = vec![ + "0e3e2357e806b6cdb1f70b54c3a3a17b6714ee1f0e68bebb44a74b1efd512098:0,true,1,5000000000,410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac", +"20251a76e64e920e58291a30d4b212939aae976baca40e70818ceaa596fb9d37:0,true,6,5000000000,410408ce279174b34c077c7b2043e3f3d45a588b85ef4ca466740f848ead7fb498f0a795c982552fdfa41616a7c0333a269d62108588e260fd5a48ac8e4dbf49e2bcac", +"63522845d294ee9b0188ae5cac91bf389a0c3723f084ca1025e7d9cdfe481ce1:0,true,5,5000000000,410456579536d150fbce94ee62b47db2ca43af0a730a0467ba55c79e2a7ec9ce4ad297e35cdbb8e42a4643a60eef7c9abee2f5822f86b1da242d9c2301c431facfd8ac", +"999e1c837c76a1b7fbb7e57baf87b309960f5ffefbf2a9b95dd890602272f644:0,true,3,5000000000,410494b9d3e76c5b1629ecf97fff95d7a4bbdac87cc26099ada28066c6ff1eb9191223cd897194a08d0c2726c5747f1db49e8cf90e75dc3e3550ae9b30086f3cd5aaac", +"9b0fc92260312ce44e74ef369f5c66bbb85848f2eddd5a7a1cde251e54ccfdd5:0,true,2,5000000000,41047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac", +"df2b060fa2e5e9c8ed5eaf6a45c13753ec8c63282b2688322eba40cd98ea067a:0,true,4,5000000000,4104184f32b212815c6e522e66686324030ff7e5bf08efb21f8b00614fb7690e19131dd31304c54f37baa40db231c918106bb9fd43373e37ae31a0befc6ecaefb867ac"]; + + let utxos = lines.into_iter().map(parse_csv_entry).collect::>(); + + let mut data = Vec::new(); + let block_hash6: BlockHash = "000000003031a0e73735690c5a1ff2a4be82553b2a12b776fbd3a215dc8f778d" + .parse() + .unwrap(); + let utxos_count = 6; + generate_snapshot_in_mem_inner( + &mut data, + bitcoin::Network::Bitcoin, + block_hash6, + utxos_count, + utxos, + ) + .unwrap(); + print_hex_dump(&data); +} + +#[test] +fn test_varint() { + let mut data = Vec::new(); + VarInt::new(143).consensus_encode(&mut data).unwrap(); + assert_eq!(data, vec![0x80, 0x0f]); +}