From 19328d4999c19557778b7b108f88f42b1258957e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 13 Jun 2024 18:22:43 +0800 Subject: [PATCH] feat(wallet)!: change persist API to use `StageExt` and `StageExtAsync` --- crates/wallet/Cargo.toml | 1 + crates/wallet/src/wallet/mod.rs | 82 ++++++++++++------- crates/wallet/tests/wallet.rs | 11 ++- example-crates/wallet_electrum/src/main.rs | 4 +- .../wallet_esplora_async/src/main.rs | 4 +- .../wallet_esplora_blocking/src/main.rs | 4 +- example-crates/wallet_rpc/src/main.rs | 4 +- 7 files changed, 66 insertions(+), 44 deletions(-) diff --git a/crates/wallet/Cargo.toml b/crates/wallet/Cargo.toml index d7f3290ac..d5b52be06 100644 --- a/crates/wallet/Cargo.toml +++ b/crates/wallet/Cargo.toml @@ -30,6 +30,7 @@ js-sys = "0.3" [features] default = ["std"] std = ["bitcoin/std", "miniscript/std", "bdk_chain/std"] +async = ["bdk_chain/async"] compiler = ["miniscript/compiler"] all-keys = ["keys-bip39"] keys-bip39 = ["bip39"] diff --git a/crates/wallet/src/wallet/mod.rs b/crates/wallet/src/wallet/mod.rs index 7917be156..5a21c1518 100644 --- a/crates/wallet/src/wallet/mod.rs +++ b/crates/wallet/src/wallet/mod.rs @@ -26,6 +26,7 @@ use bdk_chain::{ local_chain::{ self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain, }, + persist::{PersistBackend, StageExt}, spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}, tx_graph::{CanonicalTx, TxGraph}, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeHeightAnchor, FullTxOut, @@ -40,7 +41,6 @@ use bitcoin::{ use bitcoin::{consensus::encode::serialize, transaction, BlockHash, Psbt}; use bitcoin::{constants::genesis_block, Amount}; use core::fmt; -use core::mem; use core::ops::Deref; use descriptor::error::Error as DescriptorError; use miniscript::psbt::{PsbtExt, PsbtInputExt, PsbtInputSatisfier}; @@ -393,18 +393,6 @@ impl Wallet { }) } - /// Stage a ['ChangeSet'] to be persisted later. - /// - /// [`commit`]: Self::commit - fn stage(&mut self, changeset: ChangeSet) { - self.stage.append(changeset) - } - - /// Take the staged [`ChangeSet`] to be persisted now. - pub fn take_staged(&mut self) -> ChangeSet { - mem::take(&mut self.stage) - } - /// Load [`Wallet`] from the given previously persisted [`ChangeSet`]. /// /// Note that the descriptor secret keys are not persisted to the db; this means that after @@ -687,7 +675,7 @@ impl Wallet { /// # let changeset = ChangeSet::default(); /// # let mut wallet = Wallet::load_from_changeset(changeset).expect("load wallet"); /// let next_address = wallet.reveal_next_address(KeychainKind::External); - /// db.write_changes(&wallet.take_staged())?; + /// wallet.commit_to(&mut db)?; /// /// // Now it's safe to show the user their next address! /// println!("Next address: {}", next_address.address); @@ -731,7 +719,7 @@ impl Wallet { .reveal_to_target(&keychain, index) .expect("keychain must exist"); - self.stage(indexed_tx_graph::ChangeSet::from(index_changeset).into()); + self.stage.append(index_changeset.into()); spks.into_iter().map(move |(index, spk)| AddressInfo { index, @@ -915,7 +903,7 @@ impl Wallet { /// [`list_output`]: Self::list_output pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) { let additions = self.indexed_graph.insert_txout(outpoint, txout); - self.stage(ChangeSet::from(additions)); + self.stage.append(additions.into()); } /// Calculates the fee of a given transaction. Returns [`Amount::ZERO`] if `tx` is a coinbase transaction. @@ -1084,7 +1072,7 @@ impl Wallet { ) -> Result { let changeset = self.chain.insert_block(block_id)?; let changed = !changeset.is_empty(); - self.stage(changeset.into()); + self.stage.append(changeset.into()); Ok(changed) } @@ -1146,7 +1134,7 @@ impl Wallet { } let changed = !changeset.is_empty(); - self.stage(changeset); + self.stage.append(changeset); Ok(changed) } @@ -1470,9 +1458,7 @@ impl Wallet { .next_unused_spk(&change_keychain) .expect("keychain must exist"); self.indexed_graph.index.mark_used(change_keychain, index); - self.stage(ChangeSet::from(indexed_tx_graph::ChangeSet::from( - index_changeset, - ))); + self.stage.append(index_changeset.into()); spk } }; @@ -2291,16 +2277,52 @@ impl Wallet { .indexed_graph .index .reveal_to_target_multi(&update.last_active_indices); - changeset.append(ChangeSet::from(indexed_tx_graph::ChangeSet::from( - index_changeset, - ))); - changeset.append(ChangeSet::from( - self.indexed_graph.apply_update(update.graph), - )); - self.stage(changeset); + changeset.append(index_changeset.into()); + changeset.append(self.indexed_graph.apply_update(update.graph).into()); + self.stage.append(changeset); Ok(()) } + /// Commits all currently [`staged`](Wallet::staged) changes to the `persist_backend`. + /// + /// This returns whether anything was persisted. + /// + /// # Error + /// + /// Returns a backend-defined error if this fails. + pub fn commit_to(&mut self, persist_backend: &mut B) -> Result + where + B: PersistBackend, + { + let committed = StageExt::commit_to(&mut self.stage, persist_backend)?; + Ok(committed.is_some()) + } + + /// Commits all currently [`staged`](Wallet::staged) changes to the async `persist_backend`. + /// + /// This returns whether anything was persisted. + /// + /// # Error + /// + /// Returns a backend-defined error if this fails. + #[cfg(feature = "async")] + pub async fn commit_to_async( + &mut self, + persist_backend: &mut B, + ) -> Result + where + B: bdk_chain::persist::PersistBackendAsync + Send + Sync, + { + let committed = + bdk_chain::persist::StageExtAsync::commit_to(&mut self.stage, persist_backend).await?; + Ok(committed.is_some()) + } + + /// Get the staged [`ChangeSet`] that is yet to be committed. + pub fn staged(&self) -> &ChangeSet { + &self.stage + } + /// Get a reference to the inner [`TxGraph`]. pub fn tx_graph(&self) -> &TxGraph { self.indexed_graph.graph() @@ -2370,7 +2392,7 @@ impl Wallet { .apply_block_relevant(block, height) .into(), ); - self.stage(changeset); + self.stage.append(changeset); Ok(()) } @@ -2393,7 +2415,7 @@ impl Wallet { let indexed_graph_changeset = self .indexed_graph .batch_insert_relevant_unconfirmed(unconfirmed_txs); - self.stage(ChangeSet::from(indexed_graph_changeset)); + self.stage.append(indexed_graph_changeset.into()); } } diff --git a/crates/wallet/tests/wallet.rs b/crates/wallet/tests/wallet.rs index beddacd6e..3ca18b760 100644 --- a/crates/wallet/tests/wallet.rs +++ b/crates/wallet/tests/wallet.rs @@ -90,11 +90,10 @@ fn load_recovers_wallet() -> anyhow::Result<()> { wallet.reveal_next_address(KeychainKind::External); // persist new wallet changes - let staged_changeset = wallet.take_staged(); - let db = &mut create_new(&file_path).expect("must create db"); - db.write_changes(&staged_changeset) + let mut db = create_new(&file_path).expect("must create db"); + wallet + .commit_to(&mut db) .map_err(|e| anyhow!("write changes error: {}", e))?; - wallet.spk_index().clone() }; @@ -158,9 +157,9 @@ fn new_or_load() -> anyhow::Result<()> { let wallet_keychains: BTreeMap<_, _> = { let wallet = &mut Wallet::new_or_load(desc, change_desc, None, Network::Testnet) .expect("must init wallet"); - let staged_changeset = wallet.take_staged(); let mut db = new_or_load(&file_path).expect("must create db"); - db.write_changes(&staged_changeset) + wallet + .commit_to(&mut db) .map_err(|e| anyhow!("write changes error: {}", e))?; wallet.keychains().map(|(k, v)| (*k, v.clone())).collect() }; diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index f634466fa..c7c563c15 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -33,7 +33,7 @@ fn main() -> Result<(), anyhow::Error> { )?; let address = wallet.next_unused_address(KeychainKind::External); - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!("Generated Address: {}", address); let balance = wallet.balance(); @@ -72,7 +72,7 @@ fn main() -> Result<(), anyhow::Error> { println!(); wallet.apply_update(update)?; - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; let balance = wallet.balance(); println!("Wallet balance after syncing: {} sats", balance.total()); diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index c3e4244cd..6a666d49b 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -30,7 +30,7 @@ async fn main() -> Result<(), anyhow::Error> { )?; let address = wallet.next_unused_address(KeychainKind::External); - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!("Generated Address: {}", address); let balance = wallet.balance(); @@ -78,7 +78,7 @@ async fn main() -> Result<(), anyhow::Error> { let _ = update.graph_update.update_last_seen_unconfirmed(now); wallet.apply_update(update)?; - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!(); let balance = wallet.balance(); diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 3b431465a..21e9fe688 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -29,7 +29,7 @@ fn main() -> Result<(), anyhow::Error> { )?; let address = wallet.next_unused_address(KeychainKind::External); - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!("Generated Address: {}", address); let balance = wallet.balance(); @@ -55,7 +55,7 @@ fn main() -> Result<(), anyhow::Error> { let _ = update.graph_update.update_last_seen_unconfirmed(now); wallet.apply_update(update)?; - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!(); let balance = wallet.balance(); diff --git a/example-crates/wallet_rpc/src/main.rs b/example-crates/wallet_rpc/src/main.rs index f5d12f8a6..e0d617493 100644 --- a/example-crates/wallet_rpc/src/main.rs +++ b/example-crates/wallet_rpc/src/main.rs @@ -147,7 +147,7 @@ fn main() -> anyhow::Result<()> { let connected_to = block_emission.connected_to(); let start_apply_block = Instant::now(); wallet.apply_block_connected_to(&block_emission.block, height, connected_to)?; - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; let elapsed = start_apply_block.elapsed().as_secs_f32(); println!( "Applied block {} at height {} in {}s", @@ -157,7 +157,7 @@ fn main() -> anyhow::Result<()> { Emission::Mempool(mempool_emission) => { let start_apply_mempool = Instant::now(); wallet.apply_unconfirmed_txs(mempool_emission.iter().map(|(tx, time)| (tx, *time))); - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!( "Applied unconfirmed transactions in {}s", start_apply_mempool.elapsed().as_secs_f32()