diff --git a/Cargo.lock b/Cargo.lock index 24de82d..a077c10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,6 +43,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -52,6 +64,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "amplify" version = "4.7.0" @@ -408,6 +426,7 @@ dependencies = [ "descriptors", "env_logger", "log", + "lru", "psbt", "rand", "rpassword", @@ -868,6 +887,10 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heck" @@ -1064,6 +1087,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lru" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "memchr" version = "2.7.4" diff --git a/Cargo.toml b/Cargo.toml index d632e42..a1ed87b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ rand = { version = "0.8.5", optional = true } rpassword = { version = "7.3.1", optional = true } aes-gcm = { version = "0.10.3", optional = true } bip39 = { version = "2.0.0", optional = true } +lru = "0.12.4" serde_crate = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } @@ -87,7 +88,7 @@ hot = ["bp-std/signers", "bip39", "rand", "aes-gcm", "rpassword"] cli = ["base64", "env_logger", "clap", "shellexpand", "fs", "serde", "electrum", "esplora", "mempool", "log", "colored"] log = ["env_logger"] electrum = ["bp-electrum", "serde", "serde_json"] -esplora = ["bp-esplora"] -mempool = ["esplora"] +esplora = ["bp-esplora", "serde", "serde_json"] +mempool = ["esplora", "serde", "serde_json"] fs = ["serde"] serde = ["serde_crate", "serde_yaml", "toml", "bp-std/serde"] diff --git a/src/cli/args.rs b/src/cli/args.rs index dae8539..3a9ca09 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -23,6 +23,7 @@ use std::fmt::Debug; use std::path::PathBuf; use std::process::exit; +use std::sync::OnceLock; use bpstd::XpubDerivable; use clap::Subcommand; @@ -32,6 +33,7 @@ use strict_encoding::Ident; use crate::cli::{ Config, DescrStdOpts, DescriptorOpts, ExecError, GeneralOpts, ResolverOpt, WalletOpts, }; +use crate::indexers::electrum::ElectrumClient; use crate::indexers::esplora; use crate::{AnyIndexer, MayError, Wallet}; @@ -95,14 +97,36 @@ impl Args { } pub fn indexer(&self) -> Result { + use crate::indexers::IndexerCache; + + // Define a static variable to store IndexerCache, + // Ensuring that all cached data is centralized + // Initialize only once + static INDEXER_CACHE: OnceLock = OnceLock::new(); + + let indexer_cache = INDEXER_CACHE + .get_or_init(|| { + IndexerCache::new( + self.general + .indexer_cache_factor + .try_into() + .expect("Error: indexer cache size is invalid"), + ) + }) + .clone(); + let network = self.general.network.to_string(); Ok(match (&self.resolver.esplora, &self.resolver.electrum, &self.resolver.mempool) { - (None, Some(url), None) => AnyIndexer::Electrum(Box::new(electrum::Client::new(url)?)), + (None, Some(url), None) => { + AnyIndexer::Electrum(Box::new(ElectrumClient::new(url, indexer_cache)?)) + } (Some(url), None, None) => AnyIndexer::Esplora(Box::new(esplora::Client::new_esplora( &url.replace("{network}", &network), + indexer_cache, )?)), (None, None, Some(url)) => AnyIndexer::Mempool(Box::new(esplora::Client::new_mempool( &url.replace("{network}", &network), + indexer_cache, )?)), _ => { eprintln!( diff --git a/src/cli/opts.rs b/src/cli/opts.rs index 224931e..206165c 100644 --- a/src/cli/opts.rs +++ b/src/cli/opts.rs @@ -158,6 +158,12 @@ pub struct GeneralOpts { /// Network to use. #[arg(short, long, global = true, default_value = "testnet3", env = "LNPBP_NETWORK")] pub network: Network, + + /// Indexer cache factor. + /// `indexer_cache_factor`: A non-zero size value that determines the capacity of the LRU + /// caches for indexer. + #[arg(long, global = true, default_value = "1000")] + pub indexer_cache_factor: usize, } impl GeneralOpts { diff --git a/src/data.rs b/src/data.rs index 75086aa..d27a2be 100644 --- a/src/data.rs +++ b/src/data.rs @@ -265,7 +265,7 @@ impl Party { Party::Subsidy => None, Party::Counterparty(addr) => Some(addr.script_pubkey()), Party::Unknown(script) => Some(script.clone()), - Party::Wallet(_) => None, + Party::Wallet(derive) => Some(derive.addr.script_pubkey()), } } } @@ -419,3 +419,15 @@ impl WalletAddr { } } } + +impl WalletAddr { + pub fn expect_transmute(self) -> WalletAddr { + WalletAddr { + terminal: self.terminal, + addr: self.addr, + used: self.used, + volume: self.volume, + balance: self.balance.sats_i64(), + } + } +} diff --git a/src/indexers/any.rs b/src/indexers/any.rs index 950def4..01d1bac 100644 --- a/src/indexers/any.rs +++ b/src/indexers/any.rs @@ -22,6 +22,8 @@ use bpstd::Tx; use descriptors::Descriptor; +#[cfg(feature = "electrum")] +use super::electrum::ElectrumClient; use crate::{Indexer, Layer2, MayError, WalletCache, WalletDescr}; /// Type that contains any of the client types implementing the Indexer trait @@ -31,7 +33,7 @@ pub enum AnyIndexer { #[cfg(feature = "electrum")] #[from] /// Electrum indexer - Electrum(Box), + Electrum(Box), #[cfg(feature = "esplora")] #[from] /// Esplora indexer diff --git a/src/indexers/cache.rs b/src/indexers/cache.rs new file mode 100644 index 0000000..7ed8cbb --- /dev/null +++ b/src/indexers/cache.rs @@ -0,0 +1,82 @@ +// Modern, minimalistic & standard-compliant cold wallet library. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Written in 2024 by +// Nicola Busanello +// +// Copyright (C) 2024 LNP/BP Standards Association. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(feature = "electrum")] +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; + +#[cfg(feature = "electrum")] +use bpstd::Txid; +use bpstd::{BlockHash, DerivedAddr, Tx}; +#[cfg(feature = "electrum")] +use electrum::GetHistoryRes; +use lru::LruCache; + +#[derive(Clone, Eq, PartialEq, Hash, Debug)] +pub struct TxDetail { + pub(crate) inner: Tx, + pub(crate) blockhash: Option, + pub(crate) blocktime: Option, +} + +#[derive(Debug, Clone)] +pub struct IndexerCache { + // The cache is designed as Arc to ensure that + // all Indexers globally use the same cache + // (to maintain data consistency). + // + // The Mutex is used to ensure that the cache is thread-safe + // Make sure to get the updated cache from immutable references + // In the create/update processing logic of &Indexer + // addr_transactions: for esplora + #[cfg(feature = "esplora")] + pub(crate) addr_transactions: Arc>>>, + // script_history: for electrum + #[cfg(feature = "electrum")] + pub(crate) script_history: Arc>>>, + // tx_details: for electrum + #[cfg(feature = "electrum")] + pub(crate) tx_details: Arc>>, +} + +impl IndexerCache { + /// Creates a new `IndexerCache` with the specified cache factor. + /// + /// # Parameters + /// - `cache_factor`: A non-zero size value that determines the capacity of the LRU caches. This + /// factor is used to initialize the `addr_transactions` and `script_history` caches. The + /// `tx_details` cache is initialized with a capacity that is 20 times the size of the + /// `script_history` cache. + pub fn new(cache_factor: NonZeroUsize) -> Self { + Self { + #[cfg(feature = "esplora")] + addr_transactions: Arc::new(Mutex::new(LruCache::new(cache_factor))), + #[cfg(feature = "electrum")] + script_history: Arc::new(Mutex::new(LruCache::new(cache_factor))), + // size of tx_details is 20 times the size of script_history for electrum + #[cfg(feature = "electrum")] + tx_details: Arc::new(Mutex::new(LruCache::new( + cache_factor.saturating_mul(NonZeroUsize::new(20).expect("20 is not zero")), + ))), + } + } +} diff --git a/src/indexers/electrum.rs b/src/indexers/electrum.rs index 8d2767c..3023bfa 100644 --- a/src/indexers/electrum.rs +++ b/src/indexers/electrum.rs @@ -19,16 +19,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::num::NonZeroU32; use std::str::FromStr; -use bpstd::{Address, BlockHash, ConsensusEncode, Outpoint, Sats, Tx, TxIn, Txid, Weight}; +use bpstd::{ + Address, BlockHash, ConsensusEncode, DerivedAddr, Outpoint, Sats, Tx, TxIn, Txid, Weight, +}; use descriptors::Descriptor; use electrum::{Client, ElectrumApi, Error, GetHistoryRes, Param}; use serde_json::Value; -use super::BATCH_SIZE; +use super::cache::TxDetail; +use super::{IndexerCache, BATCH_SIZE}; use crate::{ Indexer, Layer2, MayError, MiningInfo, Party, TxCredit, TxDebit, TxStatus, WalletAddr, WalletCache, WalletDescr, WalletTx, @@ -59,30 +62,34 @@ pub enum ElectrumError { Client(Error), } -impl Indexer for Client { - type Error = ElectrumError; +pub struct ElectrumClient { + client: Client, + cache: IndexerCache, +} - fn create, L2: Layer2>( +impl ElectrumClient { + fn process_wallet_descriptor, L2: Layer2>( &self, descriptor: &WalletDescr, - ) -> MayError, Vec> { - let mut cache = WalletCache::new(); - let mut errors = Vec::::new(); - + cache: &mut WalletCache, + errors: &mut Vec, + is_update: bool, + ) -> BTreeMap, Vec)> { let mut address_index = BTreeMap::new(); for keychain in descriptor.keychains() { let mut empty_count = 0usize; eprint!(" keychain {keychain} "); for derive in descriptor.addresses(keychain) { - let script = derive.addr.script_pubkey(); - eprint!("."); + let script = derive.addr.script_pubkey(); let mut txids = Vec::new(); - let Ok(hres) = - self.script_get_history(&script).map_err(|err| errors.push(err.into())) - else { - break; + + let hres = if is_update { + self.update_script_history(&derive, errors) + } else { + self.get_script_history(&derive, errors) }; + if hres.is_empty() { empty_count += 1; if empty_count >= BATCH_SIZE { @@ -93,197 +100,365 @@ impl Indexer for Client { empty_count = 0; - let mut process_history_entry = - |hr: GetHistoryRes| -> Result { - let txid = hr.tx_hash; - txids.push(txid); - - // get the tx details (requires electrum verbose support) - let tx_details = self.raw_call("blockchain.transaction.get", vec![ - Param::String(hr.tx_hash.to_string()), - Param::Bool(true), - ])?; - - let tx = tx_details - .get("hex") - .and_then(Value::as_str) - .and_then(|s| Tx::from_str(s).ok()) - .ok_or(ElectrumApiError::InvalidTx(txid))?; - - // build TxStatus - let status = if hr.height < 1 { - TxStatus::Mempool - } else { - let block_hash = tx_details - .get("blockhash") - .and_then(Value::as_str) - .and_then(|s| BlockHash::from_str(s).ok()) - .ok_or(ElectrumApiError::InvalidBlockHash(txid))?; - let blocktime = tx_details - .get("blocktime") - .and_then(Value::as_u64) - .ok_or(ElectrumApiError::InvalidBlockTime(txid))?; - let height = NonZeroU32::try_from(hr.height as u32) - .map_err(|_| ElectrumApiError::InvalidBlockHeight(txid))?; - TxStatus::Mined(MiningInfo { - height, - time: blocktime, - block_hash, - }) - }; - let tx_size = tx.consensus_serialize().len(); - let weight = tx.weight_units().to_u32(); - - // get inputs to build TxCredit's and total amount, - // collecting indexer errors - let mut input_total = Sats::ZERO; - let mut inputs = Vec::with_capacity(tx.inputs.len()); - for input in tx.inputs { - // get value from previous output tx - let prev_tx = self.transaction_get(&input.prev_output.txid)?; - let prev_out = prev_tx - .outputs - .get(input.prev_output.vout.into_usize()) - .ok_or_else(|| { - ElectrumApiError::PrevOutTxMismatch(txid, input.clone()) - })?; - let value = prev_out.value; - input_total += value; - inputs.push(TxCredit { - outpoint: input.prev_output, - payer: Party::Unknown(prev_out.script_pubkey.clone()), - sequence: input.sequence, - coinbase: false, - script_sig: input.sig_script, - witness: input.witness, - value, - }) - } - - // get outputs and total amount, build TxDebit's - let mut output_total = Sats::ZERO; - let mut outputs = Vec::with_capacity(tx.outputs.len()); - for (no, txout) in tx.outputs.into_iter().enumerate() { - output_total += txout.value; - outputs.push(TxDebit { - outpoint: Outpoint::new(txid, no as u32), - beneficiary: Party::Unknown(txout.script_pubkey), - value: txout.value, - spent: None, - }) - } - - // build the WalletTx - return Ok(WalletTx { - txid, - status, - inputs, - outputs, - fee: input_total - output_total, - size: tx_size as u32, - weight, - version: tx.version, - locktime: tx.lock_time, - }); - }; - - // build wallet transactions from script tx history, collecting indexer errors - for hr in hres { - match process_history_entry(hr) { + for (_, hr) in hres { + match self.process_history_entry(hr, &mut txids) { Ok(tx) => { cache.tx.insert(tx.txid, tx); } - Err(e) => errors.push(e.into()), + Err(e) => errors.push(e), } } - let wallet_addr = WalletAddr::::from(derive); + let wallet_addr = if is_update { + self.get_or_create_wallet_addr::(cache, &derive) + } else { + WalletAddr::::from(derive) + }; + address_index.insert(script, (wallet_addr, txids)); } } + address_index + } - // TODO: Update headers & tip - - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { + fn process_transactions, L2: Layer2>( + &self, + descriptor: &WalletDescr, + cache: &mut WalletCache, + address_index: &mut BTreeMap, Vec)>, + ) { + for (script, (wallet_addr, txids)) in address_index { + // UTXOs and inputs must be processed separately due to the unordered nature and + // dependencies of transaction IDs. Handling them in a single loop can cause + // data inconsistencies. For example, if spending transactions are processed + // first, new change UTXOs are added and spent UTXOs are removed. However, + // in the subsequent loop, these already spent UTXOs are treated as new + // transactions and reinserted into the UTXO set. + for txid in txids.iter() { let mut tx = cache.tx.remove(txid).expect("broken logic"); - for debit in &mut tx.outputs { - let Some(s) = debit.beneficiary.script_pubkey() else { - continue; - }; - if &s == script { - cache.utxo.insert(debit.outpoint); - debit.beneficiary = Party::from_wallet_addr(wallet_addr); - wallet_addr.used = wallet_addr.used.saturating_add(1); - wallet_addr.volume.saturating_add_assign(debit.value); - wallet_addr.balance = wallet_addr - .balance - .saturating_add(debit.value.sats().try_into().expect("sats overflow")); - } else if debit.beneficiary.is_unknown() { - Address::with(&s, descriptor.network()) - .map(|addr| { - debit.beneficiary = Party::Counterparty(addr); - }) - .ok(); - } - } + self.process_outputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); cache.tx.insert(tx.txid, tx); } - } - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { + for txid in txids.iter() { let mut tx = cache.tx.remove(txid).expect("broken logic"); - for credit in &mut tx.inputs { - let Some(s) = credit.payer.script_pubkey() else { - continue; - }; - if &s == script { - credit.payer = Party::from_wallet_addr(wallet_addr); - wallet_addr.balance = wallet_addr - .balance - .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); - } else if credit.payer.is_unknown() { - Address::with(&s, descriptor.network()) - .map(|addr| { - credit.payer = Party::Counterparty(addr); - }) - .ok(); - } - if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { - if let Some(txout) = - prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) - { - let outpoint = txout.outpoint; - if tx.status.is_mined() { - cache.utxo.remove(&outpoint); - } - txout.spent = Some(credit.outpoint.into()) - }; - } - } + self.process_inputs::<_, _, L2>(descriptor, script, wallet_addr, &mut tx, cache); cache.tx.insert(tx.txid, tx); } + cache .addr .entry(wallet_addr.terminal.keychain) .or_default() .insert(wallet_addr.expect_transmute()); } + } + + fn process_outputs, L2: Layer2>( + &self, + descriptor: &WalletDescr, + script: &bpstd::ScriptPubkey, + wallet_addr: &mut WalletAddr, + tx: &mut WalletTx, + cache: &mut WalletCache, + ) { + for debit in &mut tx.outputs { + if let Some(s) = debit.beneficiary.script_pubkey() { + if &s == script { + cache.utxo.insert(debit.outpoint); + debit.beneficiary = Party::from_wallet_addr(wallet_addr); + wallet_addr.used = wallet_addr.used.saturating_add(1); + wallet_addr.volume.saturating_add_assign(debit.value); + wallet_addr.balance = wallet_addr + .balance + .saturating_add(debit.value.sats().try_into().expect("sats overflow")); + } else if debit.beneficiary.is_unknown() { + Address::with(&s, descriptor.network()) + .map(|addr| debit.beneficiary = Party::Counterparty(addr)) + .ok(); + } + } + } + } + + fn process_inputs, L2: Layer2>( + &self, + descriptor: &WalletDescr, + script: &bpstd::ScriptPubkey, + wallet_addr: &mut WalletAddr, + tx: &mut WalletTx, + cache: &mut WalletCache, + ) { + for credit in &mut tx.inputs { + if let Some(s) = credit.payer.script_pubkey() { + if &s == script { + credit.payer = Party::from_wallet_addr(wallet_addr); + wallet_addr.balance = wallet_addr + .balance + .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); + } else if credit.payer.is_unknown() { + Address::with(&s, descriptor.network()) + .map(|addr| credit.payer = Party::Counterparty(addr)) + .ok(); + } + } + if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { + if let Some(txout) = prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) { + let outpoint = txout.outpoint; + if tx.status.is_mined() { + cache.utxo.remove(&outpoint); + } + txout.spent = Some(credit.outpoint.into()) + }; + } + } + } + + fn get_or_create_wallet_addr( + &self, + cache: &mut WalletCache, + derive: &DerivedAddr, + ) -> WalletAddr { + let wallet_addr_key = WalletAddr::from(*derive); + cache + .addr + .entry(wallet_addr_key.terminal.keychain) + .or_default() + .get(&wallet_addr_key) + .cloned() + .map(|wallet_addr| WalletAddr { + terminal: wallet_addr.terminal, + addr: derive.addr, + used: wallet_addr.used, + volume: wallet_addr.volume, + balance: wallet_addr.balance.sats_i64(), + }) + .unwrap_or_else(|| WalletAddr::from(*derive)) + } +} +impl ElectrumClient { + pub fn new(url: &str, cache: IndexerCache) -> Result { + let client = Client::new(url).map_err(ElectrumError::Client)?; + Ok(Self { client, cache }) + } + + fn get_script_history( + &self, + derived_addr: &DerivedAddr, + errors: &mut Vec, + ) -> HashMap { + let mut cache = self.cache.script_history.lock().expect("poisoned lock"); + if let Some(history) = cache.get(derived_addr) { + return history.clone(); + } + + let script = derived_addr.addr.script_pubkey(); + let hres = self + .client + .script_get_history(&script) + .map_err(|err| errors.push(err.into())) + .unwrap_or_default(); + let hres: HashMap = + hres.into_iter().map(|hr| (hr.tx_hash, hr)).collect(); + + cache.put(derived_addr.clone(), hres.clone()); + hres + } + + fn update_script_history( + &self, + derived_addr: &DerivedAddr, + errors: &mut Vec, + ) -> HashMap { + let mut upsert = HashMap::new(); + + let mut cache = self.cache.script_history.lock().expect("poisoned lock"); + + let old_history = cache.get(derived_addr).cloned().unwrap_or_default(); + + let script = derived_addr.addr.script_pubkey(); + let new_history = self + .client + .script_get_history(&script) + .map_err(|err| { + errors.push(err.into()); + }) + .unwrap_or_default(); + if new_history.is_empty() { + return upsert; + } + + let new_history: HashMap = + new_history.into_iter().map(|hr| (hr.tx_hash, hr)).collect(); + + for (txid, hr) in new_history.iter() { + if let Some(old_hr) = old_history.get(txid) { + if old_hr.height != hr.height { + upsert.insert(*txid, hr.clone()); + } + continue; + } + + upsert.insert(*txid, hr.clone()); + } + + cache.put(derived_addr.clone(), new_history.clone()); + upsert + } + + fn get_transaction_details(&self, txid: &Txid) -> Result { + let mut cache = self.cache.tx_details.lock().expect("poisoned lock"); + if let Some(details) = cache.get(txid) { + // if blockhash exists, the transaction has been confirmed and will not change + // Otherwise, we need to get the latest information + if details.blockhash.is_some() { + return Ok(details.clone()); + } + } + + let tx_details = self.client.raw_call("blockchain.transaction.get", vec![ + Param::String(txid.to_string()), + Param::Bool(true), + ])?; + + let inner: Tx = tx_details + .get("hex") + .and_then(Value::as_str) + .and_then(|s| Tx::from_str(s).ok()) + .ok_or(ElectrumApiError::InvalidTx(txid.clone()))?; + + let blockhash = tx_details + .get("blockhash") + .and_then(Value::as_str) + .and_then(|s| BlockHash::from_str(s).ok()); + let blocktime = tx_details.get("blocktime").and_then(Value::as_u64); + + let tx_detail = TxDetail { + inner, + blockhash, + blocktime, + }; + + cache.put(*txid, tx_detail.clone()); + Ok(tx_detail) + } + + fn process_history_entry( + &self, + hr: GetHistoryRes, + txids: &mut Vec, + ) -> Result { + let txid = hr.tx_hash; + txids.push(txid); + + let TxDetail { + inner: tx, + blockhash, + blocktime, + } = self.get_transaction_details(&txid)?; + + // build TxStatus + let status = if hr.height < 1 { + TxStatus::Mempool + } else { + let height = NonZeroU32::try_from(hr.height as u32) + .map_err(|_| ElectrumApiError::InvalidBlockHeight(txid))?; + TxStatus::Mined(MiningInfo { + height, + time: blocktime.expect("blocktime is missing"), + block_hash: blockhash.expect("blockhash is missing"), + }) + }; + let tx_size = tx.consensus_serialize().len(); + let weight = tx.weight_units().to_u32(); + + // get inputs to build TxCredit's and total amount, + // collecting indexer errors + let mut input_total = Sats::ZERO; + let mut inputs = Vec::with_capacity(tx.inputs.len()); + for input in tx.inputs { + // get value from previous output tx + let prev_tx = self.get_transaction_details(&input.prev_output.txid)?.inner; + let prev_out = prev_tx + .outputs + .get(input.prev_output.vout.into_usize()) + .ok_or_else(|| ElectrumApiError::PrevOutTxMismatch(txid, input.clone()))?; + let value = prev_out.value; + input_total += value; + inputs.push(TxCredit { + outpoint: input.prev_output, + payer: Party::Unknown(prev_out.script_pubkey.clone()), + sequence: input.sequence, + coinbase: false, + script_sig: input.sig_script, + witness: input.witness, + value, + }) + } + + // get outputs and total amount, build TxDebit's + let mut output_total = Sats::ZERO; + let mut outputs = Vec::with_capacity(tx.outputs.len()); + for (no, txout) in tx.outputs.into_iter().enumerate() { + output_total += txout.value; + outputs.push(TxDebit { + outpoint: Outpoint::new(txid, no as u32), + beneficiary: Party::Unknown(txout.script_pubkey), + value: txout.value, + spent: None, + }) + } + + // build the WalletTx + return Ok(WalletTx { + txid, + status, + inputs, + outputs, + fee: input_total - output_total, + size: tx_size as u32, + weight, + version: tx.version, + locktime: tx.lock_time, + }); + } +} + +impl Indexer for ElectrumClient { + type Error = ElectrumError; + + fn create, L2: Layer2>( + &self, + descriptor: &WalletDescr, + ) -> MayError, Vec> { + let mut cache = WalletCache::new(); + let mut errors = Vec::new(); + let mut address_index = + self.process_wallet_descriptor::<_, _, L2>(descriptor, &mut cache, &mut errors, false); + self.process_transactions::<_, _, L2>(descriptor, &mut cache, &mut address_index); if errors.is_empty() { MayError::ok(cache) } else { MayError::err(cache, errors) } } fn update, L2: Layer2>( &self, - _descr: &WalletDescr, - _cache: &mut WalletCache, + descriptor: &WalletDescr, + cache: &mut WalletCache, ) -> MayError> { - todo!() + let mut errors = Vec::new(); + let mut address_index = + self.process_wallet_descriptor::<_, _, L2>(descriptor, cache, &mut errors, true); + + self.process_transactions::<_, _, L2>(descriptor, cache, &mut address_index); + if errors.is_empty() { + MayError::ok(address_index.len()) + } else { + MayError::err(address_index.len(), errors) + } } fn publish(&self, tx: &Tx) -> Result<(), Self::Error> { - self.transaction_broadcast(tx)?; + self.client.transaction_broadcast(tx)?; Ok(()) } } diff --git a/src/indexers/esplora.rs b/src/indexers/esplora.rs index b0cb46d..e0a027f 100644 --- a/src/indexers/esplora.rs +++ b/src/indexers/esplora.rs @@ -24,10 +24,13 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use std::ops::{Deref, DerefMut}; -use bpstd::{Address, DerivedAddr, LockTime, Outpoint, SeqNo, Tx, TxVer, Witness}; +use bpstd::{ + Address, DerivedAddr, LockTime, Outpoint, ScriptPubkey, SeqNo, Tx, TxVer, Txid, Witness, +}; use descriptors::Descriptor; use esplora::{BlockingClient, Error}; +use super::cache::IndexerCache; #[cfg(feature = "mempool")] use super::mempool::Mempool; use super::BATCH_SIZE; @@ -41,6 +44,7 @@ use crate::{ pub struct Client { pub(crate) inner: BlockingClient, pub(crate) kind: ClientKind, + pub(crate) cache: IndexerCache, } impl Deref for Client { @@ -62,6 +66,28 @@ pub enum ClientKind { Mempool, } +#[cfg_attr( + feature = "serde", + derive(serde::Serialize, serde::Deserialize), + serde(crate = "serde_crate") +)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct FullAddrStats { + pub address: String, + pub chain_stats: AddrTxStats, + pub mempool_stats: AddrTxStats, +} + +#[cfg_attr( + feature = "serde", + derive(serde::Serialize, serde::Deserialize), + serde(crate = "serde_crate") +)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct AddrTxStats { + pub tx_count: u64, +} + impl Client { /// Creates a new Esplora client with the specified URL. /// @@ -73,14 +99,109 @@ impl Client { /// /// Returns an error if the client fails to connect to the Esplora server. #[allow(clippy::result_large_err)] - pub fn new_esplora(url: &str) -> Result { + pub fn new_esplora(url: &str, cache: IndexerCache) -> Result { let inner = esplora::Builder::new(url).build_blocking()?; let client = Self { inner, kind: ClientKind::Esplora, + cache, }; Ok(client) } + + /// Retrieves all transactions associated with a given script hash. + /// + /// # Arguments + /// + /// * `client` - The Esplora client. + /// * `derive` - The derived address. + /// * `force_update` - A flag indicating whether to force an update of the transactions. + /// + /// # Errors + /// + /// Returns an error if there was a problem retrieving the transactions. + #[allow(clippy::result_large_err)] + fn get_scripthash_txs_all( + &self, + derive: &DerivedAddr, + force_update: bool, + ) -> Result, Error> { + // Check the cache first + if !force_update { + let mut addr_transactions_cache = + self.cache.addr_transactions.lock().expect("poisoned lock"); + if let Some(cached_txs) = addr_transactions_cache.get(derive) { + return Ok(cached_txs.clone()); + } + } + + const PAGE_SIZE: usize = 25; + let mut res = Vec::new(); + let mut last_seen = None; + let script = derive.addr.script_pubkey(); + #[cfg(feature = "mempool")] + let address = derive.addr.to_string(); + + loop { + let r = match self.kind { + ClientKind::Esplora => self.inner.scripthash_txs(&script, last_seen)?, + #[cfg(feature = "mempool")] + ClientKind::Mempool => self.inner.address_txs(&address, last_seen)?, + }; + match &r[..] { + [a @ .., esplora::Tx { txid, .. }] if a.len() >= PAGE_SIZE - 1 => { + last_seen = Some(*txid); + res.extend(r); + } + _ => { + res.extend(r); + break; + } + } + } + + // Cache the results + { + let mut addr_transactions_cache = + self.cache.addr_transactions.lock().expect("poisoned lock"); + addr_transactions_cache.put(derive.clone(), res.clone()); + } + + Ok(res) + } + + fn get_addr_tx_stats_by_cache(&self, derive: &DerivedAddr) -> FullAddrStats { + let mut addr_transactions_cache = + self.cache.addr_transactions.lock().expect("poisoned lock"); + let address = derive.addr.to_string(); + + if let Some(cached_txs) = addr_transactions_cache.get(derive) { + let chain_stats_tx_count = cached_txs.iter().filter(|tx| tx.status.confirmed).count(); + let mempool_stats_tx_count = + cached_txs.iter().filter(|tx| !tx.status.confirmed).count(); + return FullAddrStats { + address, + chain_stats: AddrTxStats { + tx_count: chain_stats_tx_count as u64, + }, + mempool_stats: AddrTxStats { + tx_count: mempool_stats_tx_count as u64, + }, + }; + } + FullAddrStats::default() + } + + fn get_addr_tx_stats_by_client(&self, derive: &DerivedAddr) -> Result { + let address = derive.addr.to_string(); + let agent = self.agent(); + let url = self.url(); + + let url = format!("{}/address/{}", url, address); + + let resp: FullAddrStats = agent.get(&url).call()?.into_json()?; + Ok(resp) + } } impl From for TxStatus { @@ -147,152 +268,142 @@ impl From for WalletTx { } } -/// Retrieves all transactions associated with a given script hash. -/// -/// # Arguments -/// -/// * `client` - The Esplora client. -/// * `derive` - The derived address. -/// -/// # Errors -/// -/// Returns an error if there was a problem retrieving the transactions. -#[allow(clippy::result_large_err)] -fn get_scripthash_txs_all( - client: &Client, - derive: &DerivedAddr, -) -> Result, Error> { - const PAGE_SIZE: usize = 25; - let mut res = Vec::new(); - let mut last_seen = None; - let script = derive.addr.script_pubkey(); - #[cfg(feature = "mempool")] - let address = derive.addr.to_string(); - - loop { - let r = match client.kind { - ClientKind::Esplora => client.inner.scripthash_txs(&script, last_seen)?, - #[cfg(feature = "mempool")] - ClientKind::Mempool => client.inner.address_txs(&address, last_seen)?, - }; - match &r[..] { - [a @ .., esplora::Tx { txid, .. }] if a.len() >= PAGE_SIZE - 1 => { - last_seen = Some(*txid); - res.extend(r); - } - _ => { - res.extend(r); - break; - } - } - } - Ok(res) -} - -impl Indexer for Client { - type Error = Error; - - fn create, L2: Layer2>( +impl Client { + fn process_wallet_descriptor, L2: Layer2>( &self, descriptor: &WalletDescr, - ) -> MayError, Vec> { - let mut cache = WalletCache::new(); - let mut errors = vec![]; - + cache: &mut WalletCache, + errors: &mut Vec, + update_mode: bool, + ) -> BTreeMap, Vec)> { let mut address_index = BTreeMap::new(); + for keychain in descriptor.keychains() { let mut empty_count = 0usize; eprint!(" keychain {keychain} "); for derive in descriptor.addresses(keychain) { - let script = derive.addr.script_pubkey(); - eprint!("."); - let mut txids = Vec::new(); - match get_scripthash_txs_all(self, &derive) { - Err(err) => { - errors.push(err); + let empty = self.process_address::( + derive, + cache, + &mut address_index, + errors, + update_mode, + ); + if empty { + empty_count += 1; + if empty_count >= BATCH_SIZE { break; } - Ok(txes) if txes.is_empty() => { - empty_count += 1; - if empty_count >= BATCH_SIZE { - break; - } - } - Ok(txes) => { - empty_count = 0; - txids = txes.iter().map(|tx| tx.txid).collect(); - cache - .tx - .extend(txes.into_iter().map(WalletTx::from).map(|tx| (tx.txid, tx))); + } else { + empty_count = 0; + } + } + } + + address_index + } + + fn process_address, L2: Layer2>( + &self, + derive: DerivedAddr, + cache: &mut WalletCache, + address_index: &mut BTreeMap, Vec)>, + errors: &mut Vec, + update_mode: bool, + ) -> bool { + let script = derive.addr.script_pubkey(); + let mut txids = Vec::new(); + let mut empty = false; + + if update_mode { + let tx_stats_by_cache = self.get_addr_tx_stats_by_cache(&derive); + let tx_stats_by_client = self + .get_addr_tx_stats_by_client(&derive) + .map_err(|err| errors.push(err)) + .unwrap_or_default(); + if tx_stats_by_client.address.is_empty() || tx_stats_by_cache == tx_stats_by_client { + let wallet_addr_key = WalletAddr::from(derive); + let keychain = wallet_addr_key.terminal.keychain; + + if let Some(keychain_addr_set) = cache.addr.get(&keychain) { + // If `wallet_addr` has been cached before, it must be set in `address_index` + // to ensure the subsequent state updates correctly. + // Also, return (empty = false); + // This ensures that every cached `wallet_addr` is checked for updates. + if let Some(cached_wallet_addr) = keychain_addr_set.get(&wallet_addr_key) { + address_index + .insert(script, ((*cached_wallet_addr).expect_transmute(), txids)); + return false; } } + return true; + } + } - let wallet_addr = WalletAddr::::from(derive); - address_index.insert(script, (wallet_addr, txids)); + match self.get_scripthash_txs_all(&derive, update_mode) { + Err(err) => { + errors.push(err); + empty = true; + } + Ok(txes) if txes.is_empty() => { + empty = true; + } + Ok(txes) => { + txids = txes.iter().map(|tx| tx.txid).collect(); + cache.tx.extend(txes.into_iter().map(WalletTx::from).map(|tx| (tx.txid, tx))); } } - // TODO: Update headers & tip + let wallet_addr = WalletAddr::::from(derive); + address_index.insert(script, (wallet_addr, txids)); + + empty + } + + fn process_transactions, L2: Layer2>( + &self, + descriptor: &WalletDescr, + cache: &mut WalletCache, + address_index: &mut BTreeMap, Vec)>, + ) { + // Keep the completed WalletAddr set + // Ensure that the subsequent status is handled correctly + let wallet_self_script_map: BTreeMap> = + address_index.iter().map(|(s, (addr, _))| (s.clone(), addr.clone())).collect(); + // Remove items with empty `txids` + address_index.retain(|_, (_, txids)| !txids.is_empty()); - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { + for (script, (wallet_addr, txids)) in address_index.iter_mut() { + // UTXOs and inputs must be processed separately due to the unordered nature and + // dependencies of transaction IDs. Handling them in a single loop can cause + // data inconsistencies. For example, if spending transactions are processed + // first, new change UTXOs are added and spent UTXOs are removed. However, + // in the subsequent loop, these already spent UTXOs are treated as new + // transactions and reinserted into the UTXO set. + for txid in txids.iter() { let mut tx = cache.tx.remove(txid).expect("broken logic"); - for debit in &mut tx.outputs { - let Some(s) = debit.beneficiary.script_pubkey() else { - continue; - }; - if &s == script { - cache.utxo.insert(debit.outpoint); - debit.beneficiary = Party::from_wallet_addr(wallet_addr); - wallet_addr.used = wallet_addr.used.saturating_add(1); - wallet_addr.volume.saturating_add_assign(debit.value); - wallet_addr.balance = wallet_addr - .balance - .saturating_add(debit.value.sats().try_into().expect("sats overflow")); - } else if debit.beneficiary.is_unknown() { - Address::with(&s, descriptor.network()) - .map(|addr| { - debit.beneficiary = Party::Counterparty(addr); - }) - .ok(); - } - } + self.process_outputs::<_, _, L2>( + descriptor, + script, + wallet_addr, + &mut tx, + cache, + &wallet_self_script_map, + ); cache.tx.insert(tx.txid, tx); } - } - for (script, (wallet_addr, txids)) in &mut address_index { - for txid in txids { + for txid in txids.iter() { let mut tx = cache.tx.remove(txid).expect("broken logic"); - for credit in &mut tx.inputs { - let Some(s) = credit.payer.script_pubkey() else { - continue; - }; - if &s == script { - credit.payer = Party::from_wallet_addr(wallet_addr); - wallet_addr.balance = wallet_addr - .balance - .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); - } else if credit.payer.is_unknown() { - Address::with(&s, descriptor.network()) - .map(|addr| { - credit.payer = Party::Counterparty(addr); - }) - .ok(); - } - if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { - if let Some(txout) = - prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) - { - let outpoint = txout.outpoint; - if tx.status.is_mined() { - cache.utxo.remove(&outpoint); - } - txout.spent = Some(credit.outpoint.into()) - }; - } - } + self.process_inputs::<_, _, L2>( + descriptor, + script, + wallet_addr, + &mut tx, + cache, + &wallet_self_script_map, + ); cache.tx.insert(tx.txid, tx); } cache @@ -301,16 +412,137 @@ impl Indexer for Client { .or_default() .insert(wallet_addr.expect_transmute()); } + } + + fn process_outputs, L2: Layer2>( + &self, + descriptor: &WalletDescr, + script: &ScriptPubkey, + wallet_addr: &mut WalletAddr, + tx: &mut WalletTx, + cache: &mut WalletCache, + wallet_self_script_map: &BTreeMap>, + ) { + for debit in &mut tx.outputs { + let Some(s) = debit.beneficiary.script_pubkey() else { + continue; + }; + + // Needs to be handled here. When iterating over keychain 0, + // it is possible that a UTXO corresponds to the change `script-public-key` `s` and is + // associated with keychain 1. However, the `script` corresponds to keychain 0. + // This discrepancy can cause issues because the outer loop uses `address_index: + // BTreeMap, Vec)>`, which is unordered + // by keychain. + // + // If transactions related to keychain-1-ScriptPubkey are processed first, the change + // UTXOs are correctly handled. However, when subsequently processing + // transactions for keychain-0-ScriptPubkey, the previously set data for keychain-1 + // can be incorrectly modified (to `Counterparty`). This specific condition needs to be + // handled. + // + // It should be handled using `wallet_self_script_map` to correctly process the + // beneficiary of the transaction output. + if &s == script { + cache.utxo.insert(debit.outpoint); + debit.beneficiary = Party::from_wallet_addr(wallet_addr); + wallet_addr.used = wallet_addr.used.saturating_add(1); + wallet_addr.volume.saturating_add_assign(debit.value); + wallet_addr.balance = wallet_addr + .balance + .saturating_add(debit.value.sats().try_into().expect("sats overflow")); + } else if debit.beneficiary.is_unknown() { + if let Some(real_addr) = wallet_self_script_map.get(&s) { + debit.beneficiary = Party::from_wallet_addr(real_addr); + continue; + } + + Address::with(&s, descriptor.network()) + .map(|addr| { + debit.beneficiary = Party::Counterparty(addr); + }) + .ok(); + } + } + } + + fn process_inputs, L2: Layer2>( + &self, + descriptor: &WalletDescr, + script: &ScriptPubkey, + wallet_addr: &mut WalletAddr, + tx: &mut WalletTx, + cache: &mut WalletCache, + wallet_self_script_map: &BTreeMap>, + ) { + for credit in &mut tx.inputs { + let Some(s) = credit.payer.script_pubkey() else { + continue; + }; + if &s == script { + credit.payer = Party::from_wallet_addr(wallet_addr); + wallet_addr.balance = wallet_addr + .balance + .saturating_sub(credit.value.sats().try_into().expect("sats overflow")); + } else if credit.payer.is_unknown() { + if let Some(real_addr) = wallet_self_script_map.get(&s) { + credit.payer = Party::from_wallet_addr(real_addr); + continue; + } + + Address::with(&s, descriptor.network()) + .map(|addr| { + credit.payer = Party::Counterparty(addr); + }) + .ok(); + } + if let Some(prev_tx) = cache.tx.get_mut(&credit.outpoint.txid) { + if let Some(txout) = prev_tx.outputs.get_mut(credit.outpoint.vout_u32() as usize) { + let outpoint = txout.outpoint; + if tx.status.is_mined() { + cache.utxo.remove(&outpoint); + } + txout.spent = Some(credit.outpoint.into()) + }; + } + } + } +} + +impl Indexer for Client { + type Error = Error; + + fn create, L2: Layer2>( + &self, + descriptor: &WalletDescr, + ) -> MayError, Vec> { + let mut cache = WalletCache::new(); + let mut errors = vec![]; + + let mut address_index = + self.process_wallet_descriptor::(descriptor, &mut cache, &mut errors, false); + + self.process_transactions::(descriptor, &mut cache, &mut address_index); if errors.is_empty() { MayError::ok(cache) } else { MayError::err(cache, errors) } } fn update, L2: Layer2>( &self, - _descr: &WalletDescr, - _cache: &mut WalletCache, + descriptor: &WalletDescr, + cache: &mut WalletCache, ) -> MayError> { - todo!() + let mut errors = vec![]; + + let mut address_index = + self.process_wallet_descriptor::(descriptor, cache, &mut errors, true); + self.process_transactions::(descriptor, cache, &mut address_index); + + if errors.is_empty() { + MayError::ok(address_index.len()) + } else { + MayError::err(address_index.len(), errors) + } } fn publish(&self, tx: &Tx) -> Result<(), Self::Error> { self.inner.broadcast(tx) } diff --git a/src/indexers/mempool.rs b/src/indexers/mempool.rs index 19292ad..4b0e441 100644 --- a/src/indexers/mempool.rs +++ b/src/indexers/mempool.rs @@ -23,6 +23,8 @@ use bpstd::Txid; use esplora::BlockingClient; +use super::IndexerCache; + impl super::esplora::Client { /// Creates a new mempool client with the specified URL. /// @@ -35,11 +37,12 @@ impl super::esplora::Client { /// A `Result` containing the new mempool client if successful, or an `esplora::Error` if an /// error occurred. #[allow(clippy::result_large_err)] - pub fn new_mempool(url: &str) -> Result { + pub fn new_mempool(url: &str, cache: IndexerCache) -> Result { let inner = esplora::Builder::new(url).build_blocking()?; let client = Self { inner, kind: super::esplora::ClientKind::Mempool, + cache, }; Ok(client) } diff --git a/src/indexers/mod.rs b/src/indexers/mod.rs index a120ec6..898c3bc 100644 --- a/src/indexers/mod.rs +++ b/src/indexers/mod.rs @@ -28,10 +28,14 @@ pub mod esplora; pub mod mempool; #[cfg(any(feature = "electrum", feature = "esplora", feature = "mempool"))] mod any; +#[cfg(any(feature = "electrum", feature = "esplora", feature = "mempool"))] +mod cache; #[cfg(any(feature = "electrum", feature = "esplora", feature = "mempool"))] pub use any::{AnyIndexer, AnyIndexerError}; use bpstd::Tx; +#[cfg(any(feature = "electrum", feature = "esplora", feature = "mempool"))] +pub use cache::IndexerCache; use descriptors::Descriptor; use crate::{Layer2, MayError, WalletCache, WalletDescr}; diff --git a/src/wallet.rs b/src/wallet.rs index a4a1f89..b088c3f 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -81,7 +81,7 @@ impl<'descr, K, D: Descriptor> Iterator for AddrIter<'descr, K, D> { ) ) )] -#[derive(Getters, Clone, Eq, PartialEq, Debug)] +#[derive(Getters, Clone, Eq, PartialEq, Debug, Hash)] pub struct WalletDescr where D: Descriptor, @@ -373,13 +373,14 @@ where Self: Save } pub fn update(&mut self, indexer: &I) -> MayError<(), Vec> { - // Not yet implemented: - // self.cache.update::(&self.descr, &self.indexer) + if self.cache.tx.is_empty() { + return WalletCache::with::<_, K, _, L2>(&self.descr, indexer).map(|cache| { + self.cache = cache; + self.set_dirty(); + }); + } - WalletCache::with::<_, K, _, L2>(&self.descr, indexer).map(|cache| { - self.cache = cache; - self.set_dirty(); - }) + self.cache.update::(&self.descr, indexer).map(|_| self.set_dirty()) } pub fn to_deriver(&self) -> D