From dba9f540c2d92d0ebfe40ea1d704b3e37231a62e Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Tue, 27 Aug 2024 18:53:36 +0800 Subject: [PATCH 01/13] feat: (WIP) Accelerate indexer with caching --- Cargo.lock | 32 ++++++++++ Cargo.toml | 1 + src/cli/args.rs | 26 ++++++++- src/cli/opts.rs | 5 ++ src/indexers/any.rs | 3 +- src/indexers/cache.rs | 37 ++++++++++++ src/indexers/electrum.rs | 122 +++++++++++++++++++++++++++++---------- src/indexers/esplora.rs | 22 ++++++- src/indexers/mempool.rs | 5 +- src/indexers/mod.rs | 4 ++ 10 files changed, 221 insertions(+), 36 deletions(-) create mode 100644 src/indexers/cache.rs diff --git a/Cargo.lock b/Cargo.lock index 24de82d..a077c10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,6 +43,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -52,6 +64,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "amplify" version = "4.7.0" @@ -408,6 +426,7 @@ dependencies = [ "descriptors", "env_logger", "log", + "lru", "psbt", "rand", "rpassword", @@ -868,6 +887,10 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heck" @@ -1064,6 +1087,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lru" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "memchr" version = "2.7.4" diff --git a/Cargo.toml b/Cargo.toml index d632e42..52cea13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ rand = { version = "0.8.5", optional = true } rpassword = { version = "7.3.1", optional = true } aes-gcm = { version = "0.10.3", optional = true } bip39 = { version = "2.0.0", optional = true } +lru = "0.12.4" serde_crate = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } diff --git a/src/cli/args.rs b/src/cli/args.rs index dae8539..a92e95e 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -23,6 +23,7 @@ use std::fmt::Debug; use std::path::PathBuf; use std::process::exit; +use std::sync::OnceLock; use bpstd::XpubDerivable; use clap::Subcommand; @@ -32,6 +33,7 @@ use strict_encoding::Ident; use crate::cli::{ Config, DescrStdOpts, DescriptorOpts, ExecError, GeneralOpts, ResolverOpt, WalletOpts, }; +use crate::indexers::electrum::ElectrumClient; use crate::indexers::esplora; use crate::{AnyIndexer, MayError, Wallet}; @@ -95,14 +97,36 @@ impl Args { } pub fn indexer(&self) -> Result { + use crate::indexers::IndexerCache; + + // Define a static variable to store IndexerCache, + // Ensuring that all cached data is centralized + // Initialize only once + static INDEXER_CACHE: OnceLock = OnceLock::new(); + + let indexer_cache = INDEXER_CACHE + .get_or_init(|| { + IndexerCache::new( + self.general + .indexer_cache_size + .try_into() + .expect("Error: indexer cache size is invalid"), + ) + }) + .clone(); + let network = self.general.network.to_string(); Ok(match (&self.resolver.esplora, &self.resolver.electrum, &self.resolver.mempool) { - (None, Some(url), None) => AnyIndexer::Electrum(Box::new(electrum::Client::new(url)?)), + (None, Some(url), None) => { + AnyIndexer::Electrum(Box::new(ElectrumClient::new(url, indexer_cache)?)) + } (Some(url), None, None) => AnyIndexer::Esplora(Box::new(esplora::Client::new_esplora( &url.replace("{network}", &network), + indexer_cache, )?)), (None, None, Some(url)) => AnyIndexer::Mempool(Box::new(esplora::Client::new_mempool( &url.replace("{network}", &network), + indexer_cache, )?)), _ => { eprintln!( diff --git a/src/cli/opts.rs b/src/cli/opts.rs index 224931e..b791a7e 100644 --- a/src/cli/opts.rs +++ b/src/cli/opts.rs @@ -158,6 +158,11 @@ pub struct GeneralOpts { /// Network to use. #[arg(short, long, global = true, default_value = "testnet3", env = "LNPBP_NETWORK")] pub network: Network, + + // TODO: Maybe update this to be a more general indexer cache size + /// Maximum number of transactions to keep in cache. + #[arg(long, global = true, default_value = "1000")] + pub indexer_cache_size: usize, } impl GeneralOpts { diff --git a/src/indexers/any.rs b/src/indexers/any.rs index 950def4..c3a2dec 100644 --- a/src/indexers/any.rs +++ b/src/indexers/any.rs @@ -22,6 +22,7 @@ use bpstd::Tx; use descriptors::Descriptor; +use super::electrum::ElectrumClient; use crate::{Indexer, Layer2, MayError, WalletCache, WalletDescr}; /// Type that contains any of the client types implementing the Indexer trait @@ -31,7 +32,7 @@ pub enum AnyIndexer { #[cfg(feature = "electrum")] #[from] /// Electrum indexer - Electrum(Box), + Electrum(Box), #[cfg(feature = "esplora")] #[from] /// Esplora indexer diff --git a/src/indexers/cache.rs b/src/indexers/cache.rs new file mode 100644 index 0000000..0db22d7 --- /dev/null +++ b/src/indexers/cache.rs @@ -0,0 +1,37 @@ +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; + +use bpstd::{BlockHash, DerivedAddr, Tx, Txid}; +use electrum::GetHistoryRes; +use lru::LruCache; + +#[derive(Clone, Eq, PartialEq, Hash, Debug)] +pub struct TxDetail { + pub(crate) inner: Tx, + pub(crate) blockhash: Option, + pub(crate) blocktime: Option, +} + +#[derive(Debug, Clone)] +pub struct IndexerCache { + // The cache is designed as Arc to ensure that + // all Indexers globally use the same cache + // (to maintain data consistency). + // + // The Mutex is used to ensure that the cache is thread-safe + // Make sure to get the updated cache from immutable references + // In the create/update processing logic of &Indexer + pub(crate) addr_transactions: Arc>>>, + pub(crate) tx_details: Arc>>, + pub(crate) script_history: Arc>>>, +} + +impl IndexerCache { + pub fn new(size: NonZeroUsize) -> Self { + Self { + addr_transactions: Arc::new(Mutex::new(LruCache::new(size))), + tx_details: Arc::new(Mutex::new(LruCache::new(size))), + script_history: Arc::new(Mutex::new(LruCache::new(size))), + } + } +} diff --git a/src/indexers/electrum.rs b/src/indexers/electrum.rs index 8d2767c..9cd24a3 100644 --- a/src/indexers/electrum.rs +++ b/src/indexers/electrum.rs @@ -23,12 +23,15 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use std::str::FromStr; -use bpstd::{Address, BlockHash, ConsensusEncode, Outpoint, Sats, Tx, TxIn, Txid, Weight}; +use bpstd::{ + Address, BlockHash, ConsensusEncode, DerivedAddr, Outpoint, Sats, Tx, TxIn, Txid, Weight, +}; use descriptors::Descriptor; use electrum::{Client, ElectrumApi, Error, GetHistoryRes, Param}; use serde_json::Value; -use super::BATCH_SIZE; +use super::cache::TxDetail; +use super::{IndexerCache, BATCH_SIZE}; use crate::{ Indexer, Layer2, MayError, MiningInfo, Party, TxCredit, TxDebit, TxStatus, WalletAddr, WalletCache, WalletDescr, WalletTx, @@ -59,7 +62,19 @@ pub enum ElectrumError { Client(Error), } -impl Indexer for Client { +pub struct ElectrumClient { + client: Client, + cache: IndexerCache, +} + +impl ElectrumClient { + pub fn new(url: &str, cache: IndexerCache) -> Result { + let client = Client::new(url).map_err(ElectrumError::Client)?; + Ok(Self { client, cache }) + } +} + +impl Indexer for ElectrumClient { type Error = ElectrumError; fn create, L2: Layer2>( @@ -78,11 +93,7 @@ impl Indexer for Client { eprint!("."); let mut txids = Vec::new(); - let Ok(hres) = - self.script_get_history(&script).map_err(|err| errors.push(err.into())) - else { - break; - }; + let hres = self.get_script_history(&derive, &mut errors); if hres.is_empty() { empty_count += 1; if empty_count >= BATCH_SIZE { @@ -98,37 +109,22 @@ impl Indexer for Client { let txid = hr.tx_hash; txids.push(txid); - // get the tx details (requires electrum verbose support) - let tx_details = self.raw_call("blockchain.transaction.get", vec![ - Param::String(hr.tx_hash.to_string()), - Param::Bool(true), - ])?; - - let tx = tx_details - .get("hex") - .and_then(Value::as_str) - .and_then(|s| Tx::from_str(s).ok()) - .ok_or(ElectrumApiError::InvalidTx(txid))?; + let TxDetail { + inner: tx, + blockhash, + blocktime, + } = self.get_transaction_details(&txid)?; // build TxStatus let status = if hr.height < 1 { TxStatus::Mempool } else { - let block_hash = tx_details - .get("blockhash") - .and_then(Value::as_str) - .and_then(|s| BlockHash::from_str(s).ok()) - .ok_or(ElectrumApiError::InvalidBlockHash(txid))?; - let blocktime = tx_details - .get("blocktime") - .and_then(Value::as_u64) - .ok_or(ElectrumApiError::InvalidBlockTime(txid))?; let height = NonZeroU32::try_from(hr.height as u32) .map_err(|_| ElectrumApiError::InvalidBlockHeight(txid))?; TxStatus::Mined(MiningInfo { height, - time: blocktime, - block_hash, + time: blocktime.expect("blocktime is missing"), + block_hash: blockhash.expect("blockhash is missing"), }) }; let tx_size = tx.consensus_serialize().len(); @@ -140,7 +136,8 @@ impl Indexer for Client { let mut inputs = Vec::with_capacity(tx.inputs.len()); for input in tx.inputs { // get value from previous output tx - let prev_tx = self.transaction_get(&input.prev_output.txid)?; + let prev_tx = + self.get_transaction_details(&input.prev_output.txid)?.inner; let prev_out = prev_tx .outputs .get(input.prev_output.vout.into_usize()) @@ -283,7 +280,68 @@ impl Indexer for Client { } fn publish(&self, tx: &Tx) -> Result<(), Self::Error> { - self.transaction_broadcast(tx)?; + self.client.transaction_broadcast(tx)?; Ok(()) } } + +impl ElectrumClient { + fn get_script_history( + &self, + derived_addr: &DerivedAddr, + errors: &mut Vec, + ) -> Vec { + let mut cache = self.cache.script_history.lock().expect("poisoned lock"); + if let Some(history) = cache.get(derived_addr) { + return history.clone(); + } + + let script = derived_addr.addr.script_pubkey(); + let hres = self + .client + .script_get_history(&script) + .map_err(|err| errors.push(err.into())) + .unwrap_or_default(); + + cache.put(derived_addr.clone(), hres.clone()); + hres + } + + fn get_transaction_details(&self, txid: &Txid) -> Result { + let mut cache = self.cache.tx_details.lock().expect("poisoned lock"); + if let Some(details) = cache.get(txid) { + return Ok(details.clone()); + } + + let tx_details = self.client.raw_call("blockchain.transaction.get", vec![ + Param::String(txid.to_string()), + Param::Bool(true), + ])?; + + let inner: Tx = tx_details + .get("hex") + .and_then(Value::as_str) + .and_then(|s| Tx::from_str(s).ok()) + .ok_or(ElectrumApiError::InvalidTx(txid.clone()))?; + + // FIXME: Maybe tx is in mempool, so blockhash and blocktime are not present? + let block_hash: BlockHash = tx_details + .get("blockhash") + .and_then(Value::as_str) + .and_then(|s| BlockHash::from_str(s).ok()) + .ok_or(ElectrumApiError::InvalidBlockHash(txid.clone()))?; + let blocktime = tx_details + .get("blocktime") + .and_then(Value::as_u64) + .ok_or(ElectrumApiError::InvalidBlockTime(txid.clone()))?; + + let tx_detail = TxDetail { + inner, + blockhash: Some(block_hash), + blocktime: Some(blocktime), + }; + + cache.put(*txid, tx_detail.clone()); + Ok(tx_detail) + } +} diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index b0cb46d..38fae36 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -28,6 +28,7 @@ use bpstd::{Address, DerivedAddr, LockTime, Outpoint, SeqNo, Tx, TxVer, Witness} use descriptors::Descriptor; use esplora::{BlockingClient, Error}; +use super::cache::IndexerCache; #[cfg(feature = "mempool")] use super::mempool::Mempool; use super::BATCH_SIZE; @@ -41,6 +42,7 @@ use crate::{ pub struct Client { pub(crate) inner: BlockingClient, pub(crate) kind: ClientKind, + pub(crate) cache: IndexerCache, } impl Deref for Client { @@ -73,11 +75,12 @@ impl Client { /// /// Returns an error if the client fails to connect to the Esplora server. #[allow(clippy::result_large_err)] - pub fn new_esplora(url: &str) -> Result { + pub fn new_esplora(url: &str, cache: IndexerCache) -> Result { let inner = esplora::Builder::new(url).build_blocking()?; let client = Self { inner, kind: ClientKind::Esplora, + cache, }; Ok(client) } @@ -162,6 +165,15 @@ fn get_scripthash_txs_all( client: &Client, derive: &DerivedAddr, ) -> Result, Error> { + // Check the cache first + { + let mut addr_transactions_cache = + client.cache.addr_transactions.lock().expect("poisoned lock"); + if let Some(cached_txs) = addr_transactions_cache.get(derive) { + return Ok(cached_txs.clone()); + } + } + const PAGE_SIZE: usize = 25; let mut res = Vec::new(); let mut last_seen = None; @@ -186,6 +198,14 @@ fn get_scripthash_txs_all( } } } + + // Cache the results + { + let mut addr_transactions_cache = + client.cache.addr_transactions.lock().expect("poisoned lock"); + addr_transactions_cache.put(derive.clone(), res.clone()); + } + Ok(res) } diff --git a/src/indexers/mempool.rs b/src/indexers/mempool.rs index 19292ad..4b0e441 100644 --- a/src/indexers/mempool.rs +++ b/src/indexers/mempool.rs @@ -23,6 +23,8 @@ use bpstd::Txid; use esplora::BlockingClient; +use super::IndexerCache; + impl super::esplora::Client { /// Creates a new mempool client with the specified URL. /// @@ -35,11 +37,12 @@ impl super::esplora::Client { /// A `Result` containing the new mempool client if successful, or an `esplora::Error` if an /// error occurred. #[allow(clippy::result_large_err)] - pub fn new_mempool(url: &str) -> Result { + pub fn new_mempool(url: &str, cache: IndexerCache) -> Result { let inner = esplora::Builder::new(url).build_blocking()?; let client = Self { inner, kind: super::esplora::ClientKind::Mempool, + cache, }; Ok(client) } diff --git a/src/indexers/mod.rs b/src/indexers/mod.rs index a120ec6..898c3bc 100644 --- a/src/indexers/mod.rs +++ b/src/indexers/mod.rs @@ -28,10 +28,14 @@ pub mod esplora; pub mod mempool; #[cfg(any(feature = "electrum", feature = "esplora", feature = "mempool"))] mod any; +#[cfg(any(feature = "electrum", feature = "esplora", feature = "mempool"))] +mod cache; #[cfg(any(feature = "electrum", feature = "esplora", feature = "mempool"))] pub use any::{AnyIndexer, AnyIndexerError}; use bpstd::Tx; +#[cfg(any(feature = "electrum", feature = "esplora", feature = "mempool"))] +pub use cache::IndexerCache; use descriptors::Descriptor; use crate::{Layer2, MayError, WalletCache, WalletDescr}; From eac5458c83572d4e6a85c095312f1b647cadc285 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Wed, 28 Aug 2024 00:35:16 +0800 Subject: [PATCH 02/13] chore: design the cache api for update logic --- src/indexers/cache.rs | 67 +++++++++++++++++++++++++++++++++++++++++-- src/wallet.rs | 8 +++++- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/src/indexers/cache.rs b/src/indexers/cache.rs index 0db22d7..1ee9e49 100644 --- a/src/indexers/cache.rs +++ b/src/indexers/cache.rs @@ -1,10 +1,13 @@ +use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -use bpstd::{BlockHash, DerivedAddr, Tx, Txid}; +use bpstd::{BlockHash, DerivedAddr, Keychain, Tx, Txid}; use electrum::GetHistoryRes; use lru::LruCache; +use super::electrum::ElectrumError; + #[derive(Clone, Eq, PartialEq, Hash, Debug)] pub struct TxDetail { pub(crate) inner: Tx, @@ -21,17 +24,75 @@ pub struct IndexerCache { // The Mutex is used to ensure that the cache is thread-safe // Make sure to get the updated cache from immutable references // In the create/update processing logic of &Indexer + // addr_transactions: for esplora pub(crate) addr_transactions: Arc>>>, - pub(crate) tx_details: Arc>>, + // TODO: WalletDescr::unique_id + #[allow(dead_code)] + pub(crate) wallet_addresses: Arc>>>>, + // script_history: for electrum pub(crate) script_history: Arc>>>, + // tx_details: for electrum + pub(crate) tx_details: Arc>>, } impl IndexerCache { pub fn new(size: NonZeroUsize) -> Self { Self { addr_transactions: Arc::new(Mutex::new(LruCache::new(size))), - tx_details: Arc::new(Mutex::new(LruCache::new(size))), + wallet_addresses: Arc::new(Mutex::new(LruCache::new(size))), script_history: Arc::new(Mutex::new(LruCache::new(size))), + // size of tx_details is 20 times the size of script_history for electrum + tx_details: Arc::new(Mutex::new(LruCache::new( + size.saturating_mul(NonZeroUsize::new(20).expect("20 is not zero")), + ))), } } + + #[allow(dead_code, unused_variables)] + fn get_cached_addresses(&self, id: String) -> HashMap> { + // From IndexerCache get cached addresses + todo!() + } + + // #[allow(dead_code)] + // fn derive_new_addresses(&self, id:String, keychain: &Keychain, new_addresses: &mut + // Vec) -> impl Iterator { Derive new addresses + // todo!() + // } + + #[allow(dead_code, unused_variables)] + fn get_cached_history(&self, derived_addr: &DerivedAddr) -> Vec { + // Get cached transaction history from IndexerCache + todo!() + } + + #[allow(dead_code, unused_variables)] + // , cache: &mut WalletCache<_> + fn update_transaction_cache( + &self, + derived_addr: &DerivedAddr, + new_history: Vec, + updated_count: &mut usize, + errors: &mut Vec, + ) { + // Update transaction cache + todo!() + } + + #[allow(dead_code, unused_variables)] + fn derive_additional_addresses( + &self, + id: String, + keychain: &Keychain, + new_addresses: &mut Vec, + ) { + // Derive additional addresses until 10 consecutive empty addresses are encountered + todo!() + } + + #[allow(dead_code, unused_variables)] + fn update_cached_addresses(&self, id: String, new_addresses: Vec) { + // Update the address cache in IndexerCache + todo!() + } } diff --git a/src/wallet.rs b/src/wallet.rs index a4a1f89..2e49e32 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -81,7 +81,7 @@ impl<'descr, K, D: Descriptor> Iterator for AddrIter<'descr, K, D> { ) ) )] -#[derive(Getters, Clone, Eq, PartialEq, Debug)] +#[derive(Getters, Clone, Eq, PartialEq, Debug, Hash)] pub struct WalletDescr where D: Descriptor, @@ -104,6 +104,12 @@ impl> WalletDescr { _phantom: PhantomData, } } + + // TODO: + // Use this unique identifier as an id for important cache of the wallet + pub fn unique_id(&self) -> String { + format!("{:?}-{:?}", self.generator.xpubs().collect::>(), self.network) + } } impl, L2: Layer2Descriptor> WalletDescr { From 699656a97c1895068f6d90e142693a3dcb1e5173 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Thu, 29 Aug 2024 19:47:24 +0800 Subject: [PATCH 03/13] chore: improve the basic implementation of Indexer::update --- src/data.rs | 17 ++ src/indexers/cache.rs | 6 +- src/indexers/electrum.rs | 381 +++++++++++++++++++++++++++++---------- src/wallet.rs | 2 +- 4 files changed, 306 insertions(+), 100 deletions(-) diff --git a/src/data.rs b/src/data.rs index 75086aa..cf97f23 100644 --- a/src/data.rs +++ b/src/data.rs @@ -23,6 +23,7 @@ use std::cmp::Ordering; use std::fmt::{self, Display, Formatter, LowerHex}; use std::num::{NonZeroU32, ParseIntError}; +use std::ops::Add; use std::str::FromStr; use amplify::hex; @@ -374,6 +375,22 @@ pub struct WalletAddr { pub balance: T, } +impl Add for WalletAddr +where T: Add +{ + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + WalletAddr { + terminal: self.terminal, + addr: self.addr, + used: self.used + rhs.used, + volume: self.volume + rhs.volume, + balance: self.balance + rhs.balance, + } + } +} + impl Ord for WalletAddr where T: Eq { diff --git a/src/indexers/cache.rs b/src/indexers/cache.rs index 1ee9e49..a5d0aad 100644 --- a/src/indexers/cache.rs +++ b/src/indexers/cache.rs @@ -30,7 +30,7 @@ pub struct IndexerCache { #[allow(dead_code)] pub(crate) wallet_addresses: Arc>>>>, // script_history: for electrum - pub(crate) script_history: Arc>>>, + pub(crate) script_history: Arc>>>, // tx_details: for electrum pub(crate) tx_details: Arc>>, } @@ -90,6 +90,10 @@ impl IndexerCache { todo!() } + // TODO: when Indexer::create is called, iterate through new addresses normally, + // then store all addresses at once and save the index + // TODO: When Indexer::update is called, + // iterate through the cached addresses normally, Chain the new Iterator, #[allow(dead_code, unused_variables)] fn update_cached_addresses(&self, id: String, new_addresses: Vec) { // Update the address cache in IndexerCache diff --git a/src/indexers/electrum.rs b/src/indexers/electrum.rs index 9cd24a3..2fcfdd0 100644 --- a/src/indexers/electrum.rs +++ b/src/indexers/electrum.rs @@ -19,7 +19,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::num::NonZeroU32; use std::str::FromStr; @@ -104,89 +104,9 @@ impl Indexer for ElectrumClient { empty_count = 0; - let mut process_history_entry = - |hr: GetHistoryRes| -> Result { - let txid = hr.tx_hash; - txids.push(txid); - - let TxDetail { - inner: tx, - blockhash, - blocktime, - } = self.get_transaction_details(&txid)?; - - // build TxStatus - let status = if hr.height < 1 { - TxStatus::Mempool - } else { - let height = NonZeroU32::try_from(hr.height as u32) - .map_err(|_| ElectrumApiError::InvalidBlockHeight(txid))?; - TxStatus::Mined(MiningInfo { - height, - time: blocktime.expect("blocktime is missing"), - block_hash: blockhash.expect("blockhash is missing"), - }) - }; - let tx_size = tx.consensus_serialize().len(); - let weight = tx.weight_units().to_u32(); - - // get inputs to build TxCredit's and total amount, - // collecting indexer errors - let mut input_total = Sats::ZERO; - let mut inputs = Vec::with_capacity(tx.inputs.len()); - for input in tx.inputs { - // get value from previous output tx - let prev_tx = - self.get_transaction_details(&input.prev_output.txid)?.inner; - let prev_out = prev_tx - .outputs - .get(input.prev_output.vout.into_usize()) - .ok_or_else(|| { - ElectrumApiError::PrevOutTxMismatch(txid, input.clone()) - })?; - let value = prev_out.value; - input_total += value; - inputs.push(TxCredit { - outpoint: input.prev_output, - payer: Party::Unknown(prev_out.script_pubkey.clone()), - sequence: input.sequence, - coinbase: false, - script_sig: input.sig_script, - witness: input.witness, - value, - }) - } - - // get outputs and total amount, build TxDebit's - let mut output_total = Sats::ZERO; - let mut outputs = Vec::with_capacity(tx.outputs.len()); - for (no, txout) in tx.outputs.into_iter().enumerate() { - output_total += txout.value; - outputs.push(TxDebit { - outpoint: Outpoint::new(txid, no as u32), - beneficiary: Party::Unknown(txout.script_pubkey), - value: txout.value, - spent: None, - }) - } - - // build the WalletTx - return Ok(WalletTx { - txid, - status, - inputs, - outputs, - fee: input_total - output_total, - size: tx_size as u32, - weight, - version: tx.version, - locktime: tx.lock_time, - }); - }; - // build wallet transactions from script tx history, collecting indexer errors - for hr in hres { - match process_history_entry(hr) { + for (_, hr) in hres { + match self.process_history_entry(hr, &mut txids) { Ok(tx) => { cache.tx.insert(tx.txid, tx); } @@ -273,10 +193,150 @@ impl Indexer for ElectrumClient { fn update, L2: Layer2>( &self, - _descr: &WalletDescr, - _cache: &mut WalletCache, + descr: &WalletDescr, + cache: &mut WalletCache, ) -> MayError> { - todo!() + let mut errors = Vec::::new(); + let mut update_size = 0; + + let mut address_index = BTreeMap::new(); + for keychain in descr.keychains() { + let mut empty_count = 0usize; + eprint!(" keychain {keychain} "); + for derive in descr.addresses(keychain) { + let script = derive.addr.script_pubkey(); + + eprint!("."); + let mut txids = Vec::new(); + let (updated_hres, append_hres) = self.update_script_history(&derive, &mut errors); + if updated_hres.is_empty() && append_hres.is_empty() { + empty_count += 1; + if empty_count >= BATCH_SIZE { + break; + } + continue; + } + + empty_count = 0; + + for (txid, hr) in updated_hres { + let tx = cache.tx.get_mut(&txid).expect("broken logic"); + + let status = if hr.height < 1 { + TxStatus::Mempool + } else { + let res = self.get_transaction_details(&txid).map_err(|e| errors.push(e)); + if res.is_err() { + continue; + } + + let TxDetail { + blockhash, + blocktime, + .. + } = res.expect("broken logic"); + + let height = NonZeroU32::try_from(hr.height as u32) + .expect("hr.height is cannot be zero"); + TxStatus::Mined(MiningInfo { + height, + time: blocktime.expect("blocktime is missing"), + block_hash: blockhash.expect("blockhash is missing"), + }) + }; + tx.status = status; + update_size += 1; + } + + for (_, hr) in append_hres { + match self.process_history_entry(hr, &mut txids) { + Ok(tx) => { + cache.tx.insert(tx.txid, tx); + } + Err(e) => errors.push(e.into()), + } + } + + let wallet_addr_key = WalletAddr::from(derive); + let old_wallet_addr = cache + .addr + .entry(wallet_addr_key.terminal.keychain) + .or_default() + .get(&wallet_addr_key) + .cloned() + .unwrap_or(wallet_addr_key); + + address_index.insert(script, (old_wallet_addr, txids)); + } + } + + for (script, (wallet_addr, txids)) in &mut address_index { + for txid in txids { + let mut tx = cache.tx.remove(txid).expect("broken logic"); + for debit in &mut tx.outputs { + let Some(s) = debit.beneficiary.script_pubkey() else { + continue; + }; + if &s == script { + cache.utxo.insert(debit.outpoint); + debit.beneficiary = Party::from_wallet_addr(wallet_addr); + wallet_addr.used = wallet_addr.used.saturating_add(1); + wallet_addr.volume.saturating_add_assign(debit.value); + wallet_addr.balance = wallet_addr.balance.saturating_add(debit.value); + } else if debit.beneficiary.is_unknown() { + Address::with(&s, descr.network()) + .map(|addr| { + debit.beneficiary = Party::Counterparty(addr); + }) + .ok(); + } + } + cache.tx.insert(tx.txid, tx); + } + } + + for (script, (wallet_addr, txids)) in &mut address_index { + for txid in txids { + let mut tx = cache.tx.remove(txid).expect("broken logic"); + for credit in &mut tx.inputs { + let Some(s) = credit.payer.script_pubkey() else { + continue; + }; + if &s == script { + credit.payer = Party::from_wallet_addr(wallet_addr); + wallet_addr.balance = wallet_addr.balance.saturating_sub(credit.value); + } else if credit.payer.is_unknown() { + Address::with(&s, descr.network()) + .map(|addr| { + credit.payer = Party::Counterparty(addr); + }) + .ok(); + } + if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { + if let Some(txout) = + prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) + { + let outpoint = txout.outpoint; + if tx.status.is_mined() { + cache.utxo.remove(&outpoint); + } + txout.spent = Some(credit.outpoint.into()) + }; + } + } + cache.tx.insert(tx.txid, tx); + } + + // replace the old wallet_addr with the new one + cache.addr.entry(wallet_addr.terminal.keychain).or_default().replace(*wallet_addr); + update_size += 1; + } + + if errors.is_empty() { + MayError::ok(update_size) + } else { + MayError::err(update_size, errors) + } } fn publish(&self, tx: &Tx) -> Result<(), Self::Error> { @@ -290,7 +350,7 @@ impl ElectrumClient { &self, derived_addr: &DerivedAddr, errors: &mut Vec, - ) -> Vec { + ) -> HashMap { let mut cache = self.cache.script_history.lock().expect("poisoned lock"); if let Some(history) = cache.get(derived_addr) { return history.clone(); @@ -302,15 +362,63 @@ impl ElectrumClient { .script_get_history(&script) .map_err(|err| errors.push(err.into())) .unwrap_or_default(); + let hres: HashMap = + hres.into_iter().map(|hr| (hr.tx_hash, hr)).collect(); cache.put(derived_addr.clone(), hres.clone()); hres } + fn update_script_history( + &self, + derived_addr: &DerivedAddr, + errors: &mut Vec, + ) -> (HashMap, HashMap) { + let mut updated = HashMap::new(); + let mut append = HashMap::new(); + + let mut cache = self.cache.script_history.lock().expect("poisoned lock"); + + let old_history = cache.get(derived_addr).cloned().unwrap_or_default(); + + let script = derived_addr.addr.script_pubkey(); + let new_history = self + .client + .script_get_history(&script) + .map_err(|err| { + errors.push(err.into()); + }) + .unwrap_or_default(); + if new_history.is_empty() { + return (updated, append); + } + + let new_history: HashMap = + new_history.into_iter().map(|hr| (hr.tx_hash, hr)).collect(); + + for (txid, hr) in new_history.iter() { + if let Some(old_hr) = old_history.get(txid) { + if old_hr.height != hr.height { + updated.insert(*txid, hr.clone()); + } + continue; + } + + append.insert(*txid, hr.clone()); + } + + cache.put(derived_addr.clone(), new_history.clone()); + (updated, append) + } + fn get_transaction_details(&self, txid: &Txid) -> Result { let mut cache = self.cache.tx_details.lock().expect("poisoned lock"); if let Some(details) = cache.get(txid) { - return Ok(details.clone()); + // if blockhash exists, the transaction has been confirmed and will not change + // Otherwise, we need to get the latest information + if details.blockhash.is_some() { + return Ok(details.clone()); + } } let tx_details = self.client.raw_call("blockchain.transaction.get", vec![ @@ -324,24 +432,101 @@ impl ElectrumClient { .and_then(|s| Tx::from_str(s).ok()) .ok_or(ElectrumApiError::InvalidTx(txid.clone()))?; - // FIXME: Maybe tx is in mempool, so blockhash and blocktime are not present? - let block_hash: BlockHash = tx_details + let blockhash = tx_details .get("blockhash") .and_then(Value::as_str) - .and_then(|s| BlockHash::from_str(s).ok()) - .ok_or(ElectrumApiError::InvalidBlockHash(txid.clone()))?; - let blocktime = tx_details - .get("blocktime") - .and_then(Value::as_u64) - .ok_or(ElectrumApiError::InvalidBlockTime(txid.clone()))?; + .and_then(|s| BlockHash::from_str(s).ok()); + let blocktime = tx_details.get("blocktime").and_then(Value::as_u64); let tx_detail = TxDetail { inner, - blockhash: Some(block_hash), - blocktime: Some(blocktime), + blockhash, + blocktime, }; cache.put(*txid, tx_detail.clone()); Ok(tx_detail) } + + // TODO: maybe WalletTx can be cached too + // TODO: status can change, so we need to update it + fn process_history_entry( + &self, + hr: GetHistoryRes, + txids: &mut Vec, + ) -> Result { + let txid = hr.tx_hash; + txids.push(txid); + + let TxDetail { + inner: tx, + blockhash, + blocktime, + } = self.get_transaction_details(&txid)?; + + // build TxStatus + let status = if hr.height < 1 { + TxStatus::Mempool + } else { + let height = NonZeroU32::try_from(hr.height as u32) + .map_err(|_| ElectrumApiError::InvalidBlockHeight(txid))?; + TxStatus::Mined(MiningInfo { + height, + time: blocktime.expect("blocktime is missing"), + block_hash: blockhash.expect("blockhash is missing"), + }) + }; + let tx_size = tx.consensus_serialize().len(); + let weight = tx.weight_units().to_u32(); + + // get inputs to build TxCredit's and total amount, + // collecting indexer errors + let mut input_total = Sats::ZERO; + let mut inputs = Vec::with_capacity(tx.inputs.len()); + for input in tx.inputs { + // get value from previous output tx + let prev_tx = self.get_transaction_details(&input.prev_output.txid)?.inner; + let prev_out = prev_tx + .outputs + .get(input.prev_output.vout.into_usize()) + .ok_or_else(|| ElectrumApiError::PrevOutTxMismatch(txid, input.clone()))?; + let value = prev_out.value; + input_total += value; + inputs.push(TxCredit { + outpoint: input.prev_output, + payer: Party::Unknown(prev_out.script_pubkey.clone()), + sequence: input.sequence, + coinbase: false, + script_sig: input.sig_script, + witness: input.witness, + value, + }) + } + + // get outputs and total amount, build TxDebit's + let mut output_total = Sats::ZERO; + let mut outputs = Vec::with_capacity(tx.outputs.len()); + for (no, txout) in tx.outputs.into_iter().enumerate() { + output_total += txout.value; + outputs.push(TxDebit { + outpoint: Outpoint::new(txid, no as u32), + beneficiary: Party::Unknown(txout.script_pubkey), + value: txout.value, + spent: None, + }) + } + + // build the WalletTx + return Ok(WalletTx { + txid, + status, + inputs, + outputs, + fee: input_total - output_total, + size: tx_size as u32, + weight, + version: tx.version, + locktime: tx.lock_time, + }); + } } diff --git a/src/wallet.rs b/src/wallet.rs index 2e49e32..14acb08 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -54,6 +54,7 @@ pub struct AddrIter<'descr, K, D: Descriptor> { generator: &'descr D, network: AddressNetwork, keychain: Keychain, + // TODO: get index index: NormalIndex, _phantom: PhantomData, } @@ -105,7 +106,6 @@ impl> WalletDescr { } } - // TODO: // Use this unique identifier as an id for important cache of the wallet pub fn unique_id(&self) -> String { format!("{:?}-{:?}", self.generator.xpubs().collect::>(), self.network) From d9168ee95ed59227c94a8b8c94445a624a326400 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Fri, 30 Aug 2024 00:49:30 +0800 Subject: [PATCH 04/13] chore: (WIP) Improve the basic implementation of esplora::update --- src/indexers/electrum.rs | 2 +- src/indexers/esplora.rs | 309 +++++++++++++++++++++++++++++++-------- 2 files changed, 247 insertions(+), 64 deletions(-) diff --git a/src/indexers/electrum.rs b/src/indexers/electrum.rs index 2fcfdd0..9ac62a1 100644 --- a/src/indexers/electrum.rs +++ b/src/indexers/electrum.rs @@ -252,6 +252,7 @@ impl Indexer for ElectrumClient { match self.process_history_entry(hr, &mut txids) { Ok(tx) => { cache.tx.insert(tx.txid, tx); + update_size += 1; } Err(e) => errors.push(e.into()), } @@ -449,7 +450,6 @@ impl ElectrumClient { } // TODO: maybe WalletTx can be cached too - // TODO: status can change, so we need to update it fn process_history_entry( &self, hr: GetHistoryRes, diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index 38fae36..b3eb0e6 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -64,6 +64,28 @@ pub enum ClientKind { Mempool, } +#[cfg_attr( + feature = "serde", + derive(serde::Serialize, serde::Deserialize), + serde(crate = "serde_crate", rename_all = "camelCase") +)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct FullAddrStats { + pub address: String, + pub chain_stats: AddrTxStats, + pub mempool_stats: AddrTxStats, +} + +#[cfg_attr( + feature = "serde", + derive(serde::Serialize, serde::Deserialize), + serde(crate = "serde_crate", rename_all = "camelCase") +)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct AddrTxStats { + pub tx_count: u64, +} + impl Client { /// Creates a new Esplora client with the specified URL. /// @@ -84,6 +106,99 @@ impl Client { }; Ok(client) } + + /// Retrieves all transactions associated with a given script hash. + /// + /// # Arguments + /// + /// * `client` - The Esplora client. + /// * `derive` - The derived address. + /// * `force_update` - A flag indicating whether to force an update of the transactions. + /// + /// # Errors + /// + /// Returns an error if there was a problem retrieving the transactions. + #[allow(clippy::result_large_err)] + fn get_scripthash_txs_all( + &self, + derive: &DerivedAddr, + force_update: bool, + ) -> Result, Error> { + // Check the cache first + if !force_update { + let mut addr_transactions_cache = + self.cache.addr_transactions.lock().expect("poisoned lock"); + if let Some(cached_txs) = addr_transactions_cache.get(derive) { + return Ok(cached_txs.clone()); + } + } + + const PAGE_SIZE: usize = 25; + let mut res = Vec::new(); + let mut last_seen = None; + let script = derive.addr.script_pubkey(); + #[cfg(feature = "mempool")] + let address = derive.addr.to_string(); + + loop { + let r = match self.kind { + ClientKind::Esplora => self.inner.scripthash_txs(&script, last_seen)?, + #[cfg(feature = "mempool")] + ClientKind::Mempool => self.inner.address_txs(&address, last_seen)?, + }; + match &r[..] { + [a @ .., esplora::Tx { txid, .. }] if a.len() >= PAGE_SIZE - 1 => { + last_seen = Some(*txid); + res.extend(r); + } + _ => { + res.extend(r); + break; + } + } + } + + // Cache the results + { + let mut addr_transactions_cache = + self.cache.addr_transactions.lock().expect("poisoned lock"); + addr_transactions_cache.put(derive.clone(), res.clone()); + } + + Ok(res) + } + + fn get_addr_tx_stats_by_cache(&self, derive: &DerivedAddr) -> FullAddrStats { + let mut addr_transactions_cache = + self.cache.addr_transactions.lock().expect("poisoned lock"); + let address = derive.addr.to_string(); + + if let Some(cached_txs) = addr_transactions_cache.get(derive) { + let chain_stats_tx_count = cached_txs.iter().filter(|tx| tx.status.confirmed).count(); + let mempool_stats_tx_count = + cached_txs.iter().filter(|tx| !tx.status.confirmed).count(); + return FullAddrStats { + address, + chain_stats: AddrTxStats { + tx_count: chain_stats_tx_count as u64, + }, + mempool_stats: AddrTxStats { + tx_count: mempool_stats_tx_count as u64, + }, + }; + } + FullAddrStats::default() + } + + fn get_addr_tx_stats_by_client(&self, derive: &DerivedAddr) -> Result { + let address = derive.addr.to_string(); + let agent = self.agent(); + let url = self.url(); + + let url = format!("{}/address/{}", url, address); + let resp: FullAddrStats = agent.get(&url).call()?.into_json()?; + Ok(resp) + } } impl From for TxStatus { @@ -150,65 +265,6 @@ impl From for WalletTx { } } -/// Retrieves all transactions associated with a given script hash. -/// -/// # Arguments -/// -/// * `client` - The Esplora client. -/// * `derive` - The derived address. -/// -/// # Errors -/// -/// Returns an error if there was a problem retrieving the transactions. -#[allow(clippy::result_large_err)] -fn get_scripthash_txs_all( - client: &Client, - derive: &DerivedAddr, -) -> Result, Error> { - // Check the cache first - { - let mut addr_transactions_cache = - client.cache.addr_transactions.lock().expect("poisoned lock"); - if let Some(cached_txs) = addr_transactions_cache.get(derive) { - return Ok(cached_txs.clone()); - } - } - - const PAGE_SIZE: usize = 25; - let mut res = Vec::new(); - let mut last_seen = None; - let script = derive.addr.script_pubkey(); - #[cfg(feature = "mempool")] - let address = derive.addr.to_string(); - - loop { - let r = match client.kind { - ClientKind::Esplora => client.inner.scripthash_txs(&script, last_seen)?, - #[cfg(feature = "mempool")] - ClientKind::Mempool => client.inner.address_txs(&address, last_seen)?, - }; - match &r[..] { - [a @ .., esplora::Tx { txid, .. }] if a.len() >= PAGE_SIZE - 1 => { - last_seen = Some(*txid); - res.extend(r); - } - _ => { - res.extend(r); - break; - } - } - } - - // Cache the results - { - let mut addr_transactions_cache = - client.cache.addr_transactions.lock().expect("poisoned lock"); - addr_transactions_cache.put(derive.clone(), res.clone()); - } - - Ok(res) -} - impl Indexer for Client { type Error = Error; @@ -228,7 +284,7 @@ impl Indexer for Client { eprint!("."); let mut txids = Vec::new(); - match get_scripthash_txs_all(self, &derive) { + match self.get_scripthash_txs_all(&derive, false) { Err(err) => { errors.push(err); break; @@ -327,10 +383,137 @@ impl Indexer for Client { fn update, L2: Layer2>( &self, - _descr: &WalletDescr, - _cache: &mut WalletCache, + descr: &WalletDescr, + cache: &mut WalletCache, ) -> MayError> { - todo!() + let mut errors = vec![]; + let mut update_size = 0; + + let mut address_index = BTreeMap::new(); + for keychain in descr.keychains() { + let mut empty_count = 0usize; + eprint!(" keychain {keychain} "); + for derive in descr.addresses(keychain) { + let script = derive.addr.script_pubkey(); + + eprint!("."); + let mut txids = Vec::new(); + let tx_stats_by_cache = self.get_addr_tx_stats_by_cache(&derive); + let tx_stats_by_client = self + .get_addr_tx_stats_by_client(&derive) + .map_err(|err| { + errors.push(err); + }) + .unwrap_or_default(); + if tx_stats_by_client.address.is_empty() { + continue; + } + + if tx_stats_by_cache == tx_stats_by_client { + continue; + } + + match self.get_scripthash_txs_all(&derive, true) { + Err(err) => { + errors.push(err); + continue; + } + Ok(txes) if txes.is_empty() => { + empty_count += 1; + if empty_count >= BATCH_SIZE { + break; + } + } + Ok(txes) => { + empty_count = 0; + txids = txes.iter().map(|tx| tx.txid).collect(); + cache + .tx + .extend(txes.into_iter().map(WalletTx::from).map(|tx| (tx.txid, tx))); + update_size += 1; + } + } + + let wallet_addr = WalletAddr::::from(derive); + address_index.insert(script, (wallet_addr, txids)); + } + } + + for (script, (wallet_addr, txids)) in &mut address_index { + for txid in txids { + let mut tx = cache.tx.remove(txid).expect("broken logic"); + for debit in &mut tx.outputs { + let Some(s) = debit.beneficiary.script_pubkey() else { + continue; + }; + if &s == script { + cache.utxo.insert(debit.outpoint); + debit.beneficiary = Party::from_wallet_addr(wallet_addr); + wallet_addr.used = wallet_addr.used.saturating_add(1); + wallet_addr.volume.saturating_add_assign(debit.value); + wallet_addr.balance = wallet_addr + .balance + .saturating_add(debit.value.sats().try_into().expect("sats overflow")); + } else if debit.beneficiary.is_unknown() { + Address::with(&s, descr.network()) + .map(|addr| { + debit.beneficiary = Party::Counterparty(addr); + }) + .ok(); + } + } + cache.tx.insert(tx.txid, tx); + } + } + + for (script, (wallet_addr, txids)) in &mut address_index { + for txid in txids { + let mut tx = cache.tx.remove(txid).expect("broken logic"); + for credit in &mut tx.inputs { + let Some(s) = credit.payer.script_pubkey() else { + continue; + }; + if &s == script { + credit.payer = Party::from_wallet_addr(wallet_addr); + wallet_addr.balance = wallet_addr + .balance + .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); + } else if credit.payer.is_unknown() { + Address::with(&s, descr.network()) + .map(|addr| { + credit.payer = Party::Counterparty(addr); + }) + .ok(); + } + if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { + if let Some(txout) = + prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) + { + let outpoint = txout.outpoint; + if tx.status.is_mined() { + cache.utxo.remove(&outpoint); + } + txout.spent = Some(credit.outpoint.into()) + }; + } + } + cache.tx.insert(tx.txid, tx); + } + + // replace the old wallet_addr with the new one + cache + .addr + .entry(wallet_addr.terminal.keychain) + .or_default() + .replace(wallet_addr.expect_transmute()); + update_size += 1; + } + + if errors.is_empty() { + MayError::ok(update_size) + } else { + MayError::err(update_size, errors) + } } fn publish(&self, tx: &Tx) -> Result<(), Self::Error> { self.inner.broadcast(tx) } From 7845c1a767981827a6fe4436b0e6ed432906e8ae Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Fri, 30 Aug 2024 12:18:32 +0800 Subject: [PATCH 05/13] chore: optimize the overall implementation of esplora-Indexer --- src/indexers/cache.rs | 60 +------ src/indexers/electrum.rs | 3 +- src/indexers/esplora.rs | 374 ++++++++++++++++++--------------------- 3 files changed, 173 insertions(+), 264 deletions(-) diff --git a/src/indexers/cache.rs b/src/indexers/cache.rs index a5d0aad..545467a 100644 --- a/src/indexers/cache.rs +++ b/src/indexers/cache.rs @@ -2,12 +2,10 @@ use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -use bpstd::{BlockHash, DerivedAddr, Keychain, Tx, Txid}; +use bpstd::{BlockHash, DerivedAddr, Tx, Txid}; use electrum::GetHistoryRes; use lru::LruCache; -use super::electrum::ElectrumError; - #[derive(Clone, Eq, PartialEq, Hash, Debug)] pub struct TxDetail { pub(crate) inner: Tx, @@ -26,9 +24,6 @@ pub struct IndexerCache { // In the create/update processing logic of &Indexer // addr_transactions: for esplora pub(crate) addr_transactions: Arc>>>, - // TODO: WalletDescr::unique_id - #[allow(dead_code)] - pub(crate) wallet_addresses: Arc>>>>, // script_history: for electrum pub(crate) script_history: Arc>>>, // tx_details: for electrum @@ -39,7 +34,6 @@ impl IndexerCache { pub fn new(size: NonZeroUsize) -> Self { Self { addr_transactions: Arc::new(Mutex::new(LruCache::new(size))), - wallet_addresses: Arc::new(Mutex::new(LruCache::new(size))), script_history: Arc::new(Mutex::new(LruCache::new(size))), // size of tx_details is 20 times the size of script_history for electrum tx_details: Arc::new(Mutex::new(LruCache::new( @@ -47,56 +41,4 @@ impl IndexerCache { ))), } } - - #[allow(dead_code, unused_variables)] - fn get_cached_addresses(&self, id: String) -> HashMap> { - // From IndexerCache get cached addresses - todo!() - } - - // #[allow(dead_code)] - // fn derive_new_addresses(&self, id:String, keychain: &Keychain, new_addresses: &mut - // Vec) -> impl Iterator { Derive new addresses - // todo!() - // } - - #[allow(dead_code, unused_variables)] - fn get_cached_history(&self, derived_addr: &DerivedAddr) -> Vec { - // Get cached transaction history from IndexerCache - todo!() - } - - #[allow(dead_code, unused_variables)] - // , cache: &mut WalletCache<_> - fn update_transaction_cache( - &self, - derived_addr: &DerivedAddr, - new_history: Vec, - updated_count: &mut usize, - errors: &mut Vec, - ) { - // Update transaction cache - todo!() - } - - #[allow(dead_code, unused_variables)] - fn derive_additional_addresses( - &self, - id: String, - keychain: &Keychain, - new_addresses: &mut Vec, - ) { - // Derive additional addresses until 10 consecutive empty addresses are encountered - todo!() - } - - // TODO: when Indexer::create is called, iterate through new addresses normally, - // then store all addresses at once and save the index - // TODO: When Indexer::update is called, - // iterate through the cached addresses normally, Chain the new Iterator, - #[allow(dead_code, unused_variables)] - fn update_cached_addresses(&self, id: String, new_addresses: Vec) { - // Update the address cache in IndexerCache - todo!() - } } diff --git a/src/indexers/electrum.rs b/src/indexers/electrum.rs index 9ac62a1..a793a55 100644 --- a/src/indexers/electrum.rs +++ b/src/indexers/electrum.rs @@ -84,7 +84,8 @@ impl Indexer for ElectrumClient { let mut cache = WalletCache::new(); let mut errors = Vec::::new(); - let mut address_index = BTreeMap::new(); + let mut address_index: BTreeMap, Vec)> = + BTreeMap::new(); for keychain in descriptor.keychains() { let mut empty_count = 0usize; eprint!(" keychain {keychain} "); diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index b3eb0e6..dd5e962 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -24,7 +24,9 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use std::ops::{Deref, DerefMut}; -use bpstd::{Address, DerivedAddr, LockTime, Outpoint, SeqNo, Tx, TxVer, Witness}; +use bpstd::{ + Address, DerivedAddr, LockTime, Outpoint, ScriptPubkey, SeqNo, Tx, TxVer, Txid, Witness, +}; use descriptors::Descriptor; use esplora::{BlockingClient, Error}; @@ -265,6 +267,140 @@ impl From for WalletTx { } } +impl Client { + fn process_address, L2: Layer2>( + &self, + derive: DerivedAddr, + cache: &mut WalletCache, + address_index: &mut BTreeMap, Vec)>, + errors: &mut Vec, + update_mode: bool, + ) -> (bool, usize) { + let script = derive.addr.script_pubkey(); + let mut txids = Vec::new(); + let mut empty = false; + let mut update_size = 0; + + if update_mode { + let tx_stats_by_cache = self.get_addr_tx_stats_by_cache(&derive); + let tx_stats_by_client = self + .get_addr_tx_stats_by_client(&derive) + .map_err(|err| errors.push(err)) + .unwrap_or_default(); + if tx_stats_by_client.address.is_empty() || tx_stats_by_cache == tx_stats_by_client { + return (true, 0); + } + } + + match self.get_scripthash_txs_all(&derive, update_mode) { + Err(err) => { + errors.push(err); + empty = true; + } + Ok(txes) if txes.is_empty() => { + empty = true; + } + Ok(txes) => { + txids = txes.iter().map(|tx| tx.txid).collect(); + cache.tx.extend(txes.into_iter().map(WalletTx::from).map(|tx| (tx.txid, tx))); + update_size += 1; + } + } + + let wallet_addr = WalletAddr::::from(derive); + address_index.insert(script, (wallet_addr, txids)); + + (empty, update_size) + } + + fn process_transactions, L2: Layer2>( + &self, + descriptor: &WalletDescr, + cache: &mut WalletCache, + address_index: &mut BTreeMap, Vec)>, + ) { + for (script, (wallet_addr, txids)) in address_index.iter_mut() { + for txid in txids { + let mut tx = cache.tx.remove(txid).expect("broken logic"); + self.process_outputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); + self.process_inputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); + cache.tx.insert(tx.txid, tx); + } + cache + .addr + .entry(wallet_addr.terminal.keychain) + .or_default() + .insert(wallet_addr.expect_transmute()); + } + } + + fn process_outputs, L2: Layer2>( + &self, + descriptor: &WalletDescr, + script: &ScriptPubkey, + wallet_addr: &mut WalletAddr, + tx: &mut WalletTx, + cache: &mut WalletCache, + ) { + for debit in &mut tx.outputs { + let Some(s) = debit.beneficiary.script_pubkey() else { + continue; + }; + if &s == script { + cache.utxo.insert(debit.outpoint); + debit.beneficiary = Party::from_wallet_addr(wallet_addr); + wallet_addr.used = wallet_addr.used.saturating_add(1); + wallet_addr.volume.saturating_add_assign(debit.value); + wallet_addr.balance = wallet_addr + .balance + .saturating_add(debit.value.sats().try_into().expect("sats overflow")); + } else if debit.beneficiary.is_unknown() { + Address::with(&s, descriptor.network()) + .map(|addr| { + debit.beneficiary = Party::Counterparty(addr); + }) + .ok(); + } + } + } + + fn process_inputs, L2: Layer2>( + &self, + descriptor: &WalletDescr, + script: &ScriptPubkey, + wallet_addr: &mut WalletAddr, + tx: &mut WalletTx, + cache: &mut WalletCache, + ) { + for credit in &mut tx.inputs { + let Some(s) = credit.payer.script_pubkey() else { + continue; + }; + if &s == script { + credit.payer = Party::from_wallet_addr(wallet_addr); + wallet_addr.balance = wallet_addr + .balance + .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); + } else if credit.payer.is_unknown() { + Address::with(&s, descriptor.network()) + .map(|addr| { + credit.payer = Party::Counterparty(addr); + }) + .ok(); + } + if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { + if let Some(txout) = prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) { + let outpoint = txout.outpoint; + if tx.status.is_mined() { + cache.utxo.remove(&outpoint); + } + txout.spent = Some(credit.outpoint.into()) + }; + } + } + } +} + impl Indexer for Client { type Error = Error; @@ -274,240 +410,70 @@ impl Indexer for Client { ) -> MayError, Vec> { let mut cache = WalletCache::new(); let mut errors = vec![]; - let mut address_index = BTreeMap::new(); + for keychain in descriptor.keychains() { let mut empty_count = 0usize; eprint!(" keychain {keychain} "); for derive in descriptor.addresses(keychain) { - let script = derive.addr.script_pubkey(); - eprint!("."); - let mut txids = Vec::new(); - match self.get_scripthash_txs_all(&derive, false) { - Err(err) => { - errors.push(err); + let (empty, _) = self.process_address::( + derive, + &mut cache, + &mut address_index, + &mut errors, + false, + ); + if empty { + empty_count += 1; + if empty_count >= BATCH_SIZE { break; } - Ok(txes) if txes.is_empty() => { - empty_count += 1; - if empty_count >= BATCH_SIZE { - break; - } - } - Ok(txes) => { - empty_count = 0; - txids = txes.iter().map(|tx| tx.txid).collect(); - cache - .tx - .extend(txes.into_iter().map(WalletTx::from).map(|tx| (tx.txid, tx))); - } + } else { + empty_count = 0; } - - let wallet_addr = WalletAddr::::from(derive); - address_index.insert(script, (wallet_addr, txids)); } } - // TODO: Update headers & tip - - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { - let mut tx = cache.tx.remove(txid).expect("broken logic"); - for debit in &mut tx.outputs { - let Some(s) = debit.beneficiary.script_pubkey() else { - continue; - }; - if &s == script { - cache.utxo.insert(debit.outpoint); - debit.beneficiary = Party::from_wallet_addr(wallet_addr); - wallet_addr.used = wallet_addr.used.saturating_add(1); - wallet_addr.volume.saturating_add_assign(debit.value); - wallet_addr.balance = wallet_addr - .balance - .saturating_add(debit.value.sats().try_into().expect("sats overflow")); - } else if debit.beneficiary.is_unknown() { - Address::with(&s, descriptor.network()) - .map(|addr| { - debit.beneficiary = Party::Counterparty(addr); - }) - .ok(); - } - } - cache.tx.insert(tx.txid, tx); - } - } - - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { - let mut tx = cache.tx.remove(txid).expect("broken logic"); - for credit in &mut tx.inputs { - let Some(s) = credit.payer.script_pubkey() else { - continue; - }; - if &s == script { - credit.payer = Party::from_wallet_addr(wallet_addr); - wallet_addr.balance = wallet_addr - .balance - .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); - } else if credit.payer.is_unknown() { - Address::with(&s, descriptor.network()) - .map(|addr| { - credit.payer = Party::Counterparty(addr); - }) - .ok(); - } - if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { - if let Some(txout) = - prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) - { - let outpoint = txout.outpoint; - if tx.status.is_mined() { - cache.utxo.remove(&outpoint); - } - txout.spent = Some(credit.outpoint.into()) - }; - } - } - cache.tx.insert(tx.txid, tx); - } - cache - .addr - .entry(wallet_addr.terminal.keychain) - .or_default() - .insert(wallet_addr.expect_transmute()); - } + self.process_transactions::(descriptor, &mut cache, &mut address_index); if errors.is_empty() { MayError::ok(cache) } else { MayError::err(cache, errors) } } fn update, L2: Layer2>( &self, - descr: &WalletDescr, + descriptor: &WalletDescr, cache: &mut WalletCache, ) -> MayError> { let mut errors = vec![]; let mut update_size = 0; - let mut address_index = BTreeMap::new(); - for keychain in descr.keychains() { + + for keychain in descriptor.keychains() { let mut empty_count = 0usize; eprint!(" keychain {keychain} "); - for derive in descr.addresses(keychain) { - let script = derive.addr.script_pubkey(); - + for derive in descriptor.addresses(keychain) { eprint!("."); - let mut txids = Vec::new(); - let tx_stats_by_cache = self.get_addr_tx_stats_by_cache(&derive); - let tx_stats_by_client = self - .get_addr_tx_stats_by_client(&derive) - .map_err(|err| { - errors.push(err); - }) - .unwrap_or_default(); - if tx_stats_by_client.address.is_empty() { - continue; - } - - if tx_stats_by_cache == tx_stats_by_client { - continue; - } - - match self.get_scripthash_txs_all(&derive, true) { - Err(err) => { - errors.push(err); - continue; - } - Ok(txes) if txes.is_empty() => { - empty_count += 1; - if empty_count >= BATCH_SIZE { - break; - } - } - Ok(txes) => { - empty_count = 0; - txids = txes.iter().map(|tx| tx.txid).collect(); - cache - .tx - .extend(txes.into_iter().map(WalletTx::from).map(|tx| (tx.txid, tx))); - update_size += 1; + let (empty, size) = self.process_address::( + derive, + cache, + &mut address_index, + &mut errors, + true, + ); + update_size += size; + if empty { + empty_count += 1; + if empty_count >= BATCH_SIZE { + break; } + } else { + empty_count = 0; } - - let wallet_addr = WalletAddr::::from(derive); - address_index.insert(script, (wallet_addr, txids)); } } - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { - let mut tx = cache.tx.remove(txid).expect("broken logic"); - for debit in &mut tx.outputs { - let Some(s) = debit.beneficiary.script_pubkey() else { - continue; - }; - if &s == script { - cache.utxo.insert(debit.outpoint); - debit.beneficiary = Party::from_wallet_addr(wallet_addr); - wallet_addr.used = wallet_addr.used.saturating_add(1); - wallet_addr.volume.saturating_add_assign(debit.value); - wallet_addr.balance = wallet_addr - .balance - .saturating_add(debit.value.sats().try_into().expect("sats overflow")); - } else if debit.beneficiary.is_unknown() { - Address::with(&s, descr.network()) - .map(|addr| { - debit.beneficiary = Party::Counterparty(addr); - }) - .ok(); - } - } - cache.tx.insert(tx.txid, tx); - } - } - - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { - let mut tx = cache.tx.remove(txid).expect("broken logic"); - for credit in &mut tx.inputs { - let Some(s) = credit.payer.script_pubkey() else { - continue; - }; - if &s == script { - credit.payer = Party::from_wallet_addr(wallet_addr); - wallet_addr.balance = wallet_addr - .balance - .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); - } else if credit.payer.is_unknown() { - Address::with(&s, descr.network()) - .map(|addr| { - credit.payer = Party::Counterparty(addr); - }) - .ok(); - } - if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { - if let Some(txout) = - prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) - { - let outpoint = txout.outpoint; - if tx.status.is_mined() { - cache.utxo.remove(&outpoint); - } - txout.spent = Some(credit.outpoint.into()) - }; - } - } - cache.tx.insert(tx.txid, tx); - } - - // replace the old wallet_addr with the new one - cache - .addr - .entry(wallet_addr.terminal.keychain) - .or_default() - .replace(wallet_addr.expect_transmute()); - update_size += 1; - } + self.process_transactions::(descriptor, cache, &mut address_index); if errors.is_empty() { MayError::ok(update_size) From 6dd831012387ef52bb7aa828684251c2929c6b74 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Fri, 30 Aug 2024 17:14:53 +0800 Subject: [PATCH 06/13] chore: optimize the overall implementation of Indexer --- src/data.rs | 17 -- src/indexers/electrum.rs | 377 +++++++++++++++------------------------ src/indexers/esplora.rs | 100 +++++------ src/wallet.rs | 16 +- 4 files changed, 200 insertions(+), 310 deletions(-) diff --git a/src/data.rs b/src/data.rs index cf97f23..75086aa 100644 --- a/src/data.rs +++ b/src/data.rs @@ -23,7 +23,6 @@ use std::cmp::Ordering; use std::fmt::{self, Display, Formatter, LowerHex}; use std::num::{NonZeroU32, ParseIntError}; -use std::ops::Add; use std::str::FromStr; use amplify::hex; @@ -375,22 +374,6 @@ pub struct WalletAddr { pub balance: T, } -impl Add for WalletAddr -where T: Add -{ - type Output = Self; - - fn add(self, rhs: Self) -> Self::Output { - WalletAddr { - terminal: self.terminal, - addr: self.addr, - used: self.used + rhs.used, - volume: self.volume + rhs.volume, - balance: self.balance + rhs.balance, - } - } -} - impl Ord for WalletAddr where T: Eq { diff --git a/src/indexers/electrum.rs b/src/indexers/electrum.rs index a793a55..1c00f4d 100644 --- a/src/indexers/electrum.rs +++ b/src/indexers/electrum.rs @@ -68,33 +68,28 @@ pub struct ElectrumClient { } impl ElectrumClient { - pub fn new(url: &str, cache: IndexerCache) -> Result { - let client = Client::new(url).map_err(ElectrumError::Client)?; - Ok(Self { client, cache }) - } -} - -impl Indexer for ElectrumClient { - type Error = ElectrumError; - - fn create, L2: Layer2>( + fn process_wallet_descriptor, L2: Layer2>( &self, descriptor: &WalletDescr, - ) -> MayError, Vec> { - let mut cache = WalletCache::new(); - let mut errors = Vec::::new(); - - let mut address_index: BTreeMap, Vec)> = - BTreeMap::new(); + cache: &mut WalletCache, + errors: &mut Vec, + is_update: bool, + ) -> BTreeMap, Vec)> { + let mut address_index = BTreeMap::new(); for keychain in descriptor.keychains() { let mut empty_count = 0usize; eprint!(" keychain {keychain} "); for derive in descriptor.addresses(keychain) { - let script = derive.addr.script_pubkey(); - eprint!("."); + let script = derive.addr.script_pubkey(); let mut txids = Vec::new(); - let hres = self.get_script_history(&derive, &mut errors); + + let hres = if is_update { + self.update_script_history(&derive, errors) + } else { + self.get_script_history(&derive, errors) + }; + if hres.is_empty() { empty_count += 1; if empty_count >= BATCH_SIZE { @@ -105,81 +100,38 @@ impl Indexer for ElectrumClient { empty_count = 0; - // build wallet transactions from script tx history, collecting indexer errors for (_, hr) in hres { match self.process_history_entry(hr, &mut txids) { Ok(tx) => { cache.tx.insert(tx.txid, tx); } - Err(e) => errors.push(e.into()), + Err(e) => errors.push(e), } } - let wallet_addr = WalletAddr::::from(derive); - address_index.insert(script, (wallet_addr, txids)); - } - } - - // TODO: Update headers & tip + let wallet_addr = if is_update { + self.get_or_create_wallet_addr::(cache, &derive) + } else { + WalletAddr::::from(derive) + }; - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { - let mut tx = cache.tx.remove(txid).expect("broken logic"); - for debit in &mut tx.outputs { - let Some(s) = debit.beneficiary.script_pubkey() else { - continue; - }; - if &s == script { - cache.utxo.insert(debit.outpoint); - debit.beneficiary = Party::from_wallet_addr(wallet_addr); - wallet_addr.used = wallet_addr.used.saturating_add(1); - wallet_addr.volume.saturating_add_assign(debit.value); - wallet_addr.balance = wallet_addr - .balance - .saturating_add(debit.value.sats().try_into().expect("sats overflow")); - } else if debit.beneficiary.is_unknown() { - Address::with(&s, descriptor.network()) - .map(|addr| { - debit.beneficiary = Party::Counterparty(addr); - }) - .ok(); - } - } - cache.tx.insert(tx.txid, tx); + address_index.insert(script, (wallet_addr, txids)); } } + address_index + } - for (script, (wallet_addr, txids)) in &mut address_index { + fn process_transactions, L2: Layer2>( + &self, + descriptor: &WalletDescr, + cache: &mut WalletCache, + address_index: &mut BTreeMap, Vec)>, + ) { + for (script, (wallet_addr, txids)) in address_index { for txid in txids { let mut tx = cache.tx.remove(txid).expect("broken logic"); - for credit in &mut tx.inputs { - let Some(s) = credit.payer.script_pubkey() else { - continue; - }; - if &s == script { - credit.payer = Party::from_wallet_addr(wallet_addr); - wallet_addr.balance = wallet_addr - .balance - .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); - } else if credit.payer.is_unknown() { - Address::with(&s, descriptor.network()) - .map(|addr| { - credit.payer = Party::Counterparty(addr); - }) - .ok(); - } - if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { - if let Some(txout) = - prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) - { - let outpoint = txout.outpoint; - if tx.status.is_mined() { - cache.utxo.remove(&outpoint); - } - txout.spent = Some(credit.outpoint.into()) - }; - } - } + self.process_outputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); + self.process_inputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); cache.tx.insert(tx.txid, tx); } cache @@ -188,166 +140,97 @@ impl Indexer for ElectrumClient { .or_default() .insert(wallet_addr.expect_transmute()); } - - if errors.is_empty() { MayError::ok(cache) } else { MayError::err(cache, errors) } } - fn update, L2: Layer2>( + fn process_outputs, L2: Layer2>( &self, - descr: &WalletDescr, + descriptor: &WalletDescr, + script: &bpstd::ScriptPubkey, + wallet_addr: &mut WalletAddr, + tx: &mut WalletTx, cache: &mut WalletCache, - ) -> MayError> { - let mut errors = Vec::::new(); - let mut update_size = 0; - - let mut address_index = BTreeMap::new(); - for keychain in descr.keychains() { - let mut empty_count = 0usize; - eprint!(" keychain {keychain} "); - for derive in descr.addresses(keychain) { - let script = derive.addr.script_pubkey(); - - eprint!("."); - let mut txids = Vec::new(); - let (updated_hres, append_hres) = self.update_script_history(&derive, &mut errors); - if updated_hres.is_empty() && append_hres.is_empty() { - empty_count += 1; - if empty_count >= BATCH_SIZE { - break; - } - continue; - } - - empty_count = 0; - - for (txid, hr) in updated_hres { - let tx = cache.tx.get_mut(&txid).expect("broken logic"); - - let status = if hr.height < 1 { - TxStatus::Mempool - } else { - let res = self.get_transaction_details(&txid).map_err(|e| errors.push(e)); - if res.is_err() { - continue; - } - - let TxDetail { - blockhash, - blocktime, - .. - } = res.expect("broken logic"); - - let height = NonZeroU32::try_from(hr.height as u32) - .expect("hr.height is cannot be zero"); - TxStatus::Mined(MiningInfo { - height, - time: blocktime.expect("blocktime is missing"), - block_hash: blockhash.expect("blockhash is missing"), - }) - }; - tx.status = status; - update_size += 1; - } - - for (_, hr) in append_hres { - match self.process_history_entry(hr, &mut txids) { - Ok(tx) => { - cache.tx.insert(tx.txid, tx); - update_size += 1; - } - Err(e) => errors.push(e.into()), - } + ) { + for debit in &mut tx.outputs { + if let Some(s) = debit.beneficiary.script_pubkey() { + if &s == script { + cache.utxo.insert(debit.outpoint); + debit.beneficiary = Party::from_wallet_addr(wallet_addr); + wallet_addr.used = wallet_addr.used.saturating_add(1); + wallet_addr.volume.saturating_add_assign(debit.value); + wallet_addr.balance = wallet_addr + .balance + .saturating_add(debit.value.sats().try_into().expect("sats overflow")); + } else if debit.beneficiary.is_unknown() { + Address::with(&s, descriptor.network()) + .map(|addr| debit.beneficiary = Party::Counterparty(addr)) + .ok(); } - - let wallet_addr_key = WalletAddr::from(derive); - let old_wallet_addr = cache - .addr - .entry(wallet_addr_key.terminal.keychain) - .or_default() - .get(&wallet_addr_key) - .cloned() - .unwrap_or(wallet_addr_key); - - address_index.insert(script, (old_wallet_addr, txids)); } } + } - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { - let mut tx = cache.tx.remove(txid).expect("broken logic"); - for debit in &mut tx.outputs { - let Some(s) = debit.beneficiary.script_pubkey() else { - continue; - }; - if &s == script { - cache.utxo.insert(debit.outpoint); - debit.beneficiary = Party::from_wallet_addr(wallet_addr); - wallet_addr.used = wallet_addr.used.saturating_add(1); - wallet_addr.volume.saturating_add_assign(debit.value); - wallet_addr.balance = wallet_addr.balance.saturating_add(debit.value); - } else if debit.beneficiary.is_unknown() { - Address::with(&s, descr.network()) - .map(|addr| { - debit.beneficiary = Party::Counterparty(addr); - }) - .ok(); - } + fn process_inputs, L2: Layer2>( + &self, + descriptor: &WalletDescr, + script: &bpstd::ScriptPubkey, + wallet_addr: &mut WalletAddr, + tx: &mut WalletTx, + cache: &mut WalletCache, + ) { + for credit in &mut tx.inputs { + if let Some(s) = credit.payer.script_pubkey() { + if &s == script { + credit.payer = Party::from_wallet_addr(wallet_addr); + wallet_addr.balance = wallet_addr + .balance + .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); + } else if credit.payer.is_unknown() { + Address::with(&s, descriptor.network()) + .map(|addr| credit.payer = Party::Counterparty(addr)) + .ok(); } - cache.tx.insert(tx.txid, tx); } - } - - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { - let mut tx = cache.tx.remove(txid).expect("broken logic"); - for credit in &mut tx.inputs { - let Some(s) = credit.payer.script_pubkey() else { - continue; - }; - if &s == script { - credit.payer = Party::from_wallet_addr(wallet_addr); - wallet_addr.balance = wallet_addr.balance.saturating_sub(credit.value); - } else if credit.payer.is_unknown() { - Address::with(&s, descr.network()) - .map(|addr| { - credit.payer = Party::Counterparty(addr); - }) - .ok(); - } - if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { - if let Some(txout) = - prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) - { - let outpoint = txout.outpoint; - if tx.status.is_mined() { - cache.utxo.remove(&outpoint); - } - txout.spent = Some(credit.outpoint.into()) - }; + if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { + if let Some(txout) = prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) { + let outpoint = txout.outpoint; + if tx.status.is_mined() { + cache.utxo.remove(&outpoint); } - } - cache.tx.insert(tx.txid, tx); + txout.spent = Some(credit.outpoint.into()) + }; } - - // replace the old wallet_addr with the new one - cache.addr.entry(wallet_addr.terminal.keychain).or_default().replace(*wallet_addr); - update_size += 1; - } - - if errors.is_empty() { - MayError::ok(update_size) - } else { - MayError::err(update_size, errors) } } - fn publish(&self, tx: &Tx) -> Result<(), Self::Error> { - self.client.transaction_broadcast(tx)?; - Ok(()) + fn get_or_create_wallet_addr( + &self, + cache: &mut WalletCache, + derive: &DerivedAddr, + ) -> WalletAddr { + let wallet_addr_key = WalletAddr::from(*derive); + cache + .addr + .entry(wallet_addr_key.terminal.keychain) + .or_default() + .get(&wallet_addr_key) + .cloned() + .map(|wallet_addr| WalletAddr { + terminal: wallet_addr.terminal, + addr: derive.addr, + used: wallet_addr.used, + volume: wallet_addr.volume, + balance: wallet_addr.balance.sats_i64(), + }) + .unwrap_or_else(|| WalletAddr::from(*derive)) } } impl ElectrumClient { + pub fn new(url: &str, cache: IndexerCache) -> Result { + let client = Client::new(url).map_err(ElectrumError::Client)?; + Ok(Self { client, cache }) + } + fn get_script_history( &self, derived_addr: &DerivedAddr, @@ -375,9 +258,8 @@ impl ElectrumClient { &self, derived_addr: &DerivedAddr, errors: &mut Vec, - ) -> (HashMap, HashMap) { - let mut updated = HashMap::new(); - let mut append = HashMap::new(); + ) -> HashMap { + let mut upsert = HashMap::new(); let mut cache = self.cache.script_history.lock().expect("poisoned lock"); @@ -392,7 +274,7 @@ impl ElectrumClient { }) .unwrap_or_default(); if new_history.is_empty() { - return (updated, append); + return upsert; } let new_history: HashMap = @@ -401,16 +283,16 @@ impl ElectrumClient { for (txid, hr) in new_history.iter() { if let Some(old_hr) = old_history.get(txid) { if old_hr.height != hr.height { - updated.insert(*txid, hr.clone()); + upsert.insert(*txid, hr.clone()); } continue; } - append.insert(*txid, hr.clone()); + upsert.insert(*txid, hr.clone()); } cache.put(derived_addr.clone(), new_history.clone()); - (updated, append) + upsert } fn get_transaction_details(&self, txid: &Txid) -> Result { @@ -450,7 +332,6 @@ impl ElectrumClient { Ok(tx_detail) } - // TODO: maybe WalletTx can be cached too fn process_history_entry( &self, hr: GetHistoryRes, @@ -531,3 +412,41 @@ impl ElectrumClient { }); } } + +impl Indexer for ElectrumClient { + type Error = ElectrumError; + + fn create, L2: Layer2>( + &self, + descriptor: &WalletDescr, + ) -> MayError, Vec> { + let mut cache = WalletCache::new(); + let mut errors = Vec::new(); + let mut address_index = + self.process_wallet_descriptor::<_, _, L2>(descriptor, &mut cache, &mut errors, false); + self.process_transactions::<_, _, L2>(descriptor, &mut cache, &mut address_index); + if errors.is_empty() { MayError::ok(cache) } else { MayError::err(cache, errors) } + } + + fn update, L2: Layer2>( + &self, + descriptor: &WalletDescr, + cache: &mut WalletCache, + ) -> MayError> { + let mut errors = Vec::new(); + let mut address_index = + self.process_wallet_descriptor::<_, _, L2>(descriptor, cache, &mut errors, true); + + self.process_transactions::<_, _, L2>(descriptor, cache, &mut address_index); + if errors.is_empty() { + MayError::ok(address_index.len()) + } else { + MayError::err(address_index.len(), errors) + } + } + + fn publish(&self, tx: &Tx) -> Result<(), Self::Error> { + self.client.transaction_broadcast(tx)?; + Ok(()) + } +} diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index dd5e962..748abc7 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -268,6 +268,41 @@ impl From for WalletTx { } impl Client { + fn process_wallet_descriptor, L2: Layer2>( + &self, + descriptor: &WalletDescr, + cache: &mut WalletCache, + errors: &mut Vec, + update_mode: bool, + ) -> BTreeMap, Vec)> { + let mut address_index = BTreeMap::new(); + + for keychain in descriptor.keychains() { + let mut empty_count = 0usize; + eprint!(" keychain {keychain} "); + for derive in descriptor.addresses(keychain) { + eprint!("."); + let empty = self.process_address::( + derive, + cache, + &mut address_index, + errors, + update_mode, + ); + if empty { + empty_count += 1; + if empty_count >= BATCH_SIZE { + break; + } + } else { + empty_count = 0; + } + } + } + + address_index + } + fn process_address, L2: Layer2>( &self, derive: DerivedAddr, @@ -275,11 +310,10 @@ impl Client { address_index: &mut BTreeMap, Vec)>, errors: &mut Vec, update_mode: bool, - ) -> (bool, usize) { + ) -> bool { let script = derive.addr.script_pubkey(); let mut txids = Vec::new(); let mut empty = false; - let mut update_size = 0; if update_mode { let tx_stats_by_cache = self.get_addr_tx_stats_by_cache(&derive); @@ -288,7 +322,7 @@ impl Client { .map_err(|err| errors.push(err)) .unwrap_or_default(); if tx_stats_by_client.address.is_empty() || tx_stats_by_cache == tx_stats_by_client { - return (true, 0); + return true; } } @@ -303,14 +337,13 @@ impl Client { Ok(txes) => { txids = txes.iter().map(|tx| tx.txid).collect(); cache.tx.extend(txes.into_iter().map(WalletTx::from).map(|tx| (tx.txid, tx))); - update_size += 1; } } let wallet_addr = WalletAddr::::from(derive); address_index.insert(script, (wallet_addr, txids)); - (empty, update_size) + empty } fn process_transactions, L2: Layer2>( @@ -410,30 +443,9 @@ impl Indexer for Client { ) -> MayError, Vec> { let mut cache = WalletCache::new(); let mut errors = vec![]; - let mut address_index = BTreeMap::new(); - for keychain in descriptor.keychains() { - let mut empty_count = 0usize; - eprint!(" keychain {keychain} "); - for derive in descriptor.addresses(keychain) { - eprint!("."); - let (empty, _) = self.process_address::( - derive, - &mut cache, - &mut address_index, - &mut errors, - false, - ); - if empty { - empty_count += 1; - if empty_count >= BATCH_SIZE { - break; - } - } else { - empty_count = 0; - } - } - } + let mut address_index = + self.process_wallet_descriptor::(descriptor, &mut cache, &mut errors, false); self.process_transactions::(descriptor, &mut cache, &mut address_index); @@ -446,39 +458,15 @@ impl Indexer for Client { cache: &mut WalletCache, ) -> MayError> { let mut errors = vec![]; - let mut update_size = 0; - let mut address_index = BTreeMap::new(); - - for keychain in descriptor.keychains() { - let mut empty_count = 0usize; - eprint!(" keychain {keychain} "); - for derive in descriptor.addresses(keychain) { - eprint!("."); - let (empty, size) = self.process_address::( - derive, - cache, - &mut address_index, - &mut errors, - true, - ); - update_size += size; - if empty { - empty_count += 1; - if empty_count >= BATCH_SIZE { - break; - } - } else { - empty_count = 0; - } - } - } + let mut address_index = + self.process_wallet_descriptor::(descriptor, cache, &mut errors, false); self.process_transactions::(descriptor, cache, &mut address_index); if errors.is_empty() { - MayError::ok(update_size) + MayError::ok(address_index.len()) } else { - MayError::err(update_size, errors) + MayError::err(address_index.len(), errors) } } diff --git a/src/wallet.rs b/src/wallet.rs index 14acb08..19905fe 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -378,14 +378,14 @@ where Self: Save res } - pub fn update(&mut self, indexer: &I) -> MayError<(), Vec> { - // Not yet implemented: - // self.cache.update::(&self.descr, &self.indexer) - - WalletCache::with::<_, K, _, L2>(&self.descr, indexer).map(|cache| { - self.cache = cache; - self.set_dirty(); - }) + pub fn update(&mut self, indexer: &I) -> MayError> { + self.cache.update::(&self.descr, indexer) + + // Original implementation + // WalletCache::with::<_, K, _, L2>(&self.descr, indexer).map(|cache| { + // self.cache = cache; + // self.set_dirty(); + // }) } pub fn to_deriver(&self) -> D From 7326c1a199706d440bdd7f0ca6b147525d44440f Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Fri, 30 Aug 2024 19:09:42 +0800 Subject: [PATCH 07/13] chore: define cache factor --- src/cli/args.rs | 2 +- src/cli/opts.rs | 7 ++++--- src/indexers/cache.rs | 15 +++++++++++---- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/cli/args.rs b/src/cli/args.rs index a92e95e..3a9ca09 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -108,7 +108,7 @@ impl Args { .get_or_init(|| { IndexerCache::new( self.general - .indexer_cache_size + .indexer_cache_factor .try_into() .expect("Error: indexer cache size is invalid"), ) diff --git a/src/cli/opts.rs b/src/cli/opts.rs index b791a7e..206165c 100644 --- a/src/cli/opts.rs +++ b/src/cli/opts.rs @@ -159,10 +159,11 @@ pub struct GeneralOpts { #[arg(short, long, global = true, default_value = "testnet3", env = "LNPBP_NETWORK")] pub network: Network, - // TODO: Maybe update this to be a more general indexer cache size - /// Maximum number of transactions to keep in cache. + /// Indexer cache factor. + /// `indexer_cache_factor`: A non-zero size value that determines the capacity of the LRU + /// caches for indexer. #[arg(long, global = true, default_value = "1000")] - pub indexer_cache_size: usize, + pub indexer_cache_factor: usize, } impl GeneralOpts { diff --git a/src/indexers/cache.rs b/src/indexers/cache.rs index 545467a..94908d6 100644 --- a/src/indexers/cache.rs +++ b/src/indexers/cache.rs @@ -31,13 +31,20 @@ pub struct IndexerCache { } impl IndexerCache { - pub fn new(size: NonZeroUsize) -> Self { + /// Creates a new `IndexerCache` with the specified cache factor. + /// + /// # Parameters + /// - `cache_factor`: A non-zero size value that determines the capacity of the LRU caches. This + /// factor is used to initialize the `addr_transactions` and `script_history` caches. The + /// `tx_details` cache is initialized with a capacity that is 20 times the size of the + /// `script_history` cache. + pub fn new(cache_factor: NonZeroUsize) -> Self { Self { - addr_transactions: Arc::new(Mutex::new(LruCache::new(size))), - script_history: Arc::new(Mutex::new(LruCache::new(size))), + addr_transactions: Arc::new(Mutex::new(LruCache::new(cache_factor))), + script_history: Arc::new(Mutex::new(LruCache::new(cache_factor))), // size of tx_details is 20 times the size of script_history for electrum tx_details: Arc::new(Mutex::new(LruCache::new( - size.saturating_mul(NonZeroUsize::new(20).expect("20 is not zero")), + cache_factor.saturating_mul(NonZeroUsize::new(20).expect("20 is not zero")), ))), } } From 6a619681615b8fad43414d08348d1d4e4f5fd019 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Sun, 1 Sep 2024 00:04:25 +0800 Subject: [PATCH 08/13] chore: add license and conditional compilation information --- src/indexers/any.rs | 1 + src/indexers/cache.rs | 28 ++++++++++++++++++++++++++++ src/indexers/esplora.rs | 2 +- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/indexers/any.rs b/src/indexers/any.rs index c3a2dec..01d1bac 100644 --- a/src/indexers/any.rs +++ b/src/indexers/any.rs @@ -22,6 +22,7 @@ use bpstd::Tx; use descriptors::Descriptor; +#[cfg(feature = "electrum")] use super::electrum::ElectrumClient; use crate::{Indexer, Layer2, MayError, WalletCache, WalletDescr}; diff --git a/src/indexers/cache.rs b/src/indexers/cache.rs index 94908d6..762a07b 100644 --- a/src/indexers/cache.rs +++ b/src/indexers/cache.rs @@ -1,8 +1,30 @@ +// Modern, minimalistic & standard-compliant cold wallet library. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Written in 2024 by +// Nicola Busanello +// +// Copyright (C) 2024 LNP/BP Standards Association. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; use bpstd::{BlockHash, DerivedAddr, Tx, Txid}; +#[cfg(feature = "electrum")] use electrum::GetHistoryRes; use lru::LruCache; @@ -23,10 +45,13 @@ pub struct IndexerCache { // Make sure to get the updated cache from immutable references // In the create/update processing logic of &Indexer // addr_transactions: for esplora + #[cfg(feature = "esplora")] pub(crate) addr_transactions: Arc>>>, // script_history: for electrum + #[cfg(feature = "electrum")] pub(crate) script_history: Arc>>>, // tx_details: for electrum + #[cfg(feature = "electrum")] pub(crate) tx_details: Arc>>, } @@ -40,9 +65,12 @@ impl IndexerCache { /// `script_history` cache. pub fn new(cache_factor: NonZeroUsize) -> Self { Self { + #[cfg(feature = "esplora")] addr_transactions: Arc::new(Mutex::new(LruCache::new(cache_factor))), + #[cfg(feature = "electrum")] script_history: Arc::new(Mutex::new(LruCache::new(cache_factor))), // size of tx_details is 20 times the size of script_history for electrum + #[cfg(feature = "electrum")] tx_details: Arc::new(Mutex::new(LruCache::new( cache_factor.saturating_mul(NonZeroUsize::new(20).expect("20 is not zero")), ))), diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index 748abc7..555976c 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -460,7 +460,7 @@ impl Indexer for Client { let mut errors = vec![]; let mut address_index = - self.process_wallet_descriptor::(descriptor, cache, &mut errors, false); + self.process_wallet_descriptor::(descriptor, cache, &mut errors, true); self.process_transactions::(descriptor, cache, &mut address_index); if errors.is_empty() { From c5d0fdb8e0f67bae444c39a2968af2685fec23d8 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Mon, 2 Sep 2024 23:49:06 +0800 Subject: [PATCH 09/13] fix: data anomalies during testing --- src/indexers/electrum.rs | 14 +++++++++++++- src/indexers/esplora.rs | 18 +++++++++++++++--- src/wallet.rs | 17 +++++++++-------- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/indexers/electrum.rs b/src/indexers/electrum.rs index 1c00f4d..3023bfa 100644 --- a/src/indexers/electrum.rs +++ b/src/indexers/electrum.rs @@ -128,12 +128,24 @@ impl ElectrumClient { address_index: &mut BTreeMap, Vec)>, ) { for (script, (wallet_addr, txids)) in address_index { - for txid in txids { + // UTXOs and inputs must be processed separately due to the unordered nature and + // dependencies of transaction IDs. Handling them in a single loop can cause + // data inconsistencies. For example, if spending transactions are processed + // first, new change UTXOs are added and spent UTXOs are removed. However, + // in the subsequent loop, these already spent UTXOs are treated as new + // transactions and reinserted into the UTXO set. + for txid in txids.iter() { let mut tx = cache.tx.remove(txid).expect("broken logic"); self.process_outputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); + cache.tx.insert(tx.txid, tx); + } + + for txid in txids.iter() { + let mut tx = cache.tx.remove(txid).expect("broken logic"); self.process_inputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); cache.tx.insert(tx.txid, tx); } + cache .addr .entry(wallet_addr.terminal.keychain) diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index 555976c..e112672 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -69,7 +69,7 @@ pub enum ClientKind { #[cfg_attr( feature = "serde", derive(serde::Serialize, serde::Deserialize), - serde(crate = "serde_crate", rename_all = "camelCase") + serde(crate = "serde_crate") )] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] pub struct FullAddrStats { @@ -81,7 +81,7 @@ pub struct FullAddrStats { #[cfg_attr( feature = "serde", derive(serde::Serialize, serde::Deserialize), - serde(crate = "serde_crate", rename_all = "camelCase") + serde(crate = "serde_crate") )] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] pub struct AddrTxStats { @@ -198,6 +198,7 @@ impl Client { let url = self.url(); let url = format!("{}/address/{}", url, address); + let resp: FullAddrStats = agent.get(&url).call()?.into_json()?; Ok(resp) } @@ -353,9 +354,20 @@ impl Client { address_index: &mut BTreeMap, Vec)>, ) { for (script, (wallet_addr, txids)) in address_index.iter_mut() { - for txid in txids { + // UTXOs and inputs must be processed separately due to the unordered nature and + // dependencies of transaction IDs. Handling them in a single loop can cause + // data inconsistencies. For example, if spending transactions are processed + // first, new change UTXOs are added and spent UTXOs are removed. However, + // in the subsequent loop, these already spent UTXOs are treated as new + // transactions and reinserted into the UTXO set. + for txid in txids.iter() { let mut tx = cache.tx.remove(txid).expect("broken logic"); self.process_outputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); + cache.tx.insert(tx.txid, tx); + } + + for txid in txids.iter() { + let mut tx = cache.tx.remove(txid).expect("broken logic"); self.process_inputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); cache.tx.insert(tx.txid, tx); } diff --git a/src/wallet.rs b/src/wallet.rs index 19905fe..4373e7f 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -378,14 +378,15 @@ where Self: Save res } - pub fn update(&mut self, indexer: &I) -> MayError> { - self.cache.update::(&self.descr, indexer) - - // Original implementation - // WalletCache::with::<_, K, _, L2>(&self.descr, indexer).map(|cache| { - // self.cache = cache; - // self.set_dirty(); - // }) + pub fn update(&mut self, indexer: &I) -> MayError<(), Vec> { + if self.cache.tx.is_empty() { + return WalletCache::with::<_, K, _, L2>(&self.descr, indexer).map(|cache| { + self.cache = cache; + self.set_dirty(); + }); + } + + self.cache.update::(&self.descr, indexer).map(|_| self.set_dirty()) } pub fn to_deriver(&self) -> D From a153bc39fa93a36f1d78cb729bc8cb34ca79b016 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Wed, 4 Sep 2024 01:41:55 +0800 Subject: [PATCH 10/13] fix: (WIP) issues in complex test cases --- src/indexers/esplora.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index e112672..0fcf7ee 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -20,7 +20,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::num::NonZeroU32; use std::ops::{Deref, DerefMut}; @@ -353,6 +353,8 @@ impl Client { cache: &mut WalletCache, address_index: &mut BTreeMap, Vec)>, ) { + let wallet_self_script_set: BTreeSet = + address_index.keys().cloned().collect::>(); for (script, (wallet_addr, txids)) in address_index.iter_mut() { // UTXOs and inputs must be processed separately due to the unordered nature and // dependencies of transaction IDs. Handling them in a single loop can cause @@ -362,7 +364,14 @@ impl Client { // transactions and reinserted into the UTXO set. for txid in txids.iter() { let mut tx = cache.tx.remove(txid).expect("broken logic"); - self.process_outputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); + self.process_outputs::<_, _, L2>( + descriptor, + script, + wallet_addr, + &mut tx, + cache, + &wallet_self_script_set, + ); cache.tx.insert(tx.txid, tx); } @@ -386,11 +395,17 @@ impl Client { wallet_addr: &mut WalletAddr, tx: &mut WalletTx, cache: &mut WalletCache, + wallet_self_script_set: &BTreeSet, ) { for debit in &mut tx.outputs { let Some(s) = debit.beneficiary.script_pubkey() else { continue; }; + // Needs to be handled here. When iterating over keychain 0, + // it is possible that a UTXO is generated as change and is associated with keychain 1. + // However, the `script` of this UTXO belongs to keychain 0. + // This mismatch between the UTXO's script_pubkey and its associated keychain can cause + // errors. It should be handled using wallet_self_script_set. if &s == script { cache.utxo.insert(debit.outpoint); debit.beneficiary = Party::from_wallet_addr(wallet_addr); @@ -400,6 +415,10 @@ impl Client { .balance .saturating_add(debit.value.sats().try_into().expect("sats overflow")); } else if debit.beneficiary.is_unknown() { + if wallet_self_script_set.contains(&s) { + debit.beneficiary = Party::from_wallet_addr(wallet_addr); + continue; + } Address::with(&s, descriptor.network()) .map(|addr| { debit.beneficiary = Party::Counterparty(addr); @@ -423,6 +442,8 @@ impl Client { }; if &s == script { credit.payer = Party::from_wallet_addr(wallet_addr); + // FIXME: balance calculation + // panicked at bp-wallet/src/data.rs:419:55 wallet_addr.balance = wallet_addr .balance .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); From 614c98379e5893e03ef205c4b169b80decae9515 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Wed, 4 Sep 2024 15:40:40 +0800 Subject: [PATCH 11/13] fix: data consistency in the esplora-indexer update scenario --- src/data.rs | 14 ++++++++- src/indexers/esplora.rs | 70 ++++++++++++++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 15 deletions(-) diff --git a/src/data.rs b/src/data.rs index 75086aa..d27a2be 100644 --- a/src/data.rs +++ b/src/data.rs @@ -265,7 +265,7 @@ impl Party { Party::Subsidy => None, Party::Counterparty(addr) => Some(addr.script_pubkey()), Party::Unknown(script) => Some(script.clone()), - Party::Wallet(_) => None, + Party::Wallet(derive) => Some(derive.addr.script_pubkey()), } } } @@ -419,3 +419,15 @@ impl WalletAddr { } } } + +impl WalletAddr { + pub fn expect_transmute(self) -> WalletAddr { + WalletAddr { + terminal: self.terminal, + addr: self.addr, + used: self.used, + volume: self.volume, + balance: self.balance.sats_i64(), + } + } +} diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index 0fcf7ee..e0a027f 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -20,7 +20,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::num::NonZeroU32; use std::ops::{Deref, DerefMut}; @@ -323,6 +323,20 @@ impl Client { .map_err(|err| errors.push(err)) .unwrap_or_default(); if tx_stats_by_client.address.is_empty() || tx_stats_by_cache == tx_stats_by_client { + let wallet_addr_key = WalletAddr::from(derive); + let keychain = wallet_addr_key.terminal.keychain; + + if let Some(keychain_addr_set) = cache.addr.get(&keychain) { + // If `wallet_addr` has been cached before, it must be set in `address_index` + // to ensure the subsequent state updates correctly. + // Also, return (empty = false); + // This ensures that every cached `wallet_addr` is checked for updates. + if let Some(cached_wallet_addr) = keychain_addr_set.get(&wallet_addr_key) { + address_index + .insert(script, ((*cached_wallet_addr).expect_transmute(), txids)); + return false; + } + } return true; } } @@ -353,8 +367,13 @@ impl Client { cache: &mut WalletCache, address_index: &mut BTreeMap, Vec)>, ) { - let wallet_self_script_set: BTreeSet = - address_index.keys().cloned().collect::>(); + // Keep the completed WalletAddr set + // Ensure that the subsequent status is handled correctly + let wallet_self_script_map: BTreeMap> = + address_index.iter().map(|(s, (addr, _))| (s.clone(), addr.clone())).collect(); + // Remove items with empty `txids` + address_index.retain(|_, (_, txids)| !txids.is_empty()); + for (script, (wallet_addr, txids)) in address_index.iter_mut() { // UTXOs and inputs must be processed separately due to the unordered nature and // dependencies of transaction IDs. Handling them in a single loop can cause @@ -370,14 +389,21 @@ impl Client { wallet_addr, &mut tx, cache, - &wallet_self_script_set, + &wallet_self_script_map, ); cache.tx.insert(tx.txid, tx); } for txid in txids.iter() { let mut tx = cache.tx.remove(txid).expect("broken logic"); - self.process_inputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); + self.process_inputs::<_, _, L2>( + descriptor, + script, + wallet_addr, + &mut tx, + cache, + &wallet_self_script_map, + ); cache.tx.insert(tx.txid, tx); } cache @@ -395,17 +421,28 @@ impl Client { wallet_addr: &mut WalletAddr, tx: &mut WalletTx, cache: &mut WalletCache, - wallet_self_script_set: &BTreeSet, + wallet_self_script_map: &BTreeMap>, ) { for debit in &mut tx.outputs { let Some(s) = debit.beneficiary.script_pubkey() else { continue; }; + // Needs to be handled here. When iterating over keychain 0, - // it is possible that a UTXO is generated as change and is associated with keychain 1. - // However, the `script` of this UTXO belongs to keychain 0. - // This mismatch between the UTXO's script_pubkey and its associated keychain can cause - // errors. It should be handled using wallet_self_script_set. + // it is possible that a UTXO corresponds to the change `script-public-key` `s` and is + // associated with keychain 1. However, the `script` corresponds to keychain 0. + // This discrepancy can cause issues because the outer loop uses `address_index: + // BTreeMap, Vec)>`, which is unordered + // by keychain. + // + // If transactions related to keychain-1-ScriptPubkey are processed first, the change + // UTXOs are correctly handled. However, when subsequently processing + // transactions for keychain-0-ScriptPubkey, the previously set data for keychain-1 + // can be incorrectly modified (to `Counterparty`). This specific condition needs to be + // handled. + // + // It should be handled using `wallet_self_script_map` to correctly process the + // beneficiary of the transaction output. if &s == script { cache.utxo.insert(debit.outpoint); debit.beneficiary = Party::from_wallet_addr(wallet_addr); @@ -415,10 +452,11 @@ impl Client { .balance .saturating_add(debit.value.sats().try_into().expect("sats overflow")); } else if debit.beneficiary.is_unknown() { - if wallet_self_script_set.contains(&s) { - debit.beneficiary = Party::from_wallet_addr(wallet_addr); + if let Some(real_addr) = wallet_self_script_map.get(&s) { + debit.beneficiary = Party::from_wallet_addr(real_addr); continue; } + Address::with(&s, descriptor.network()) .map(|addr| { debit.beneficiary = Party::Counterparty(addr); @@ -435,6 +473,7 @@ impl Client { wallet_addr: &mut WalletAddr, tx: &mut WalletTx, cache: &mut WalletCache, + wallet_self_script_map: &BTreeMap>, ) { for credit in &mut tx.inputs { let Some(s) = credit.payer.script_pubkey() else { @@ -442,12 +481,15 @@ impl Client { }; if &s == script { credit.payer = Party::from_wallet_addr(wallet_addr); - // FIXME: balance calculation - // panicked at bp-wallet/src/data.rs:419:55 wallet_addr.balance = wallet_addr .balance .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); } else if credit.payer.is_unknown() { + if let Some(real_addr) = wallet_self_script_map.get(&s) { + credit.payer = Party::from_wallet_addr(real_addr); + continue; + } + Address::with(&s, descriptor.network()) .map(|addr| { credit.payer = Party::Counterparty(addr); From ecd2c41564e131ba232c186a514503a2c0e30785 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Wed, 4 Sep 2024 17:02:01 +0800 Subject: [PATCH 12/13] chore: remove useless code --- src/wallet.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/wallet.rs b/src/wallet.rs index 4373e7f..b088c3f 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -54,7 +54,6 @@ pub struct AddrIter<'descr, K, D: Descriptor> { generator: &'descr D, network: AddressNetwork, keychain: Keychain, - // TODO: get index index: NormalIndex, _phantom: PhantomData, } @@ -105,11 +104,6 @@ impl> WalletDescr { _phantom: PhantomData, } } - - // Use this unique identifier as an id for important cache of the wallet - pub fn unique_id(&self) -> String { - format!("{:?}-{:?}", self.generator.xpubs().collect::>(), self.network) - } } impl, L2: Layer2Descriptor> WalletDescr { From bcfc6c708ef2d27874aecdb177b3e1e31bbc1556 Mon Sep 17 00:00:00 2001 From: well-bitlight Date: Wed, 4 Sep 2024 17:46:16 +0800 Subject: [PATCH 13/13] fix: compile error of ci --- Cargo.toml | 4 ++-- src/indexers/cache.rs | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 52cea13..a1ed87b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,7 @@ hot = ["bp-std/signers", "bip39", "rand", "aes-gcm", "rpassword"] cli = ["base64", "env_logger", "clap", "shellexpand", "fs", "serde", "electrum", "esplora", "mempool", "log", "colored"] log = ["env_logger"] electrum = ["bp-electrum", "serde", "serde_json"] -esplora = ["bp-esplora"] -mempool = ["esplora"] +esplora = ["bp-esplora", "serde", "serde_json"] +mempool = ["esplora", "serde", "serde_json"] fs = ["serde"] serde = ["serde_crate", "serde_yaml", "toml", "bp-std/serde"] diff --git a/src/indexers/cache.rs b/src/indexers/cache.rs index 762a07b..7ed8cbb 100644 --- a/src/indexers/cache.rs +++ b/src/indexers/cache.rs @@ -19,11 +19,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "electrum")] use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -use bpstd::{BlockHash, DerivedAddr, Tx, Txid}; +#[cfg(feature = "electrum")] +use bpstd::Txid; +use bpstd::{BlockHash, DerivedAddr, Tx}; #[cfg(feature = "electrum")] use electrum::GetHistoryRes; use lru::LruCache;