Skip to content

Commit

Permalink
feat(esplora): add SpkWithHistory support
Browse files Browse the repository at this point in the history
  • Loading branch information
LagginTimes committed Jan 24, 2025
1 parent d394362 commit 7f09b70
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 24 deletions.
44 changes: 32 additions & 12 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use async_trait::async_trait;
use bdk_core::collections::{BTreeMap, BTreeSet, HashSet};
use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse};
use bdk_core::spk_client::{
FullScanRequest, FullScanResponse, SpkWithHistory, SyncRequest, SyncResponse,
};
use bdk_core::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
bitcoin::{BlockHash, OutPoint, Txid},
BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate,
};
use esplora_client::Sleeper;
Expand Down Expand Up @@ -77,10 +79,19 @@ where
let mut last_active_indices = BTreeMap::<K, u32>::new();
for keychain in keychains {
let keychain_spks = request.iter_spks(keychain.clone());
let spks_with_history = keychain_spks.into_iter().map(|(i, spk)| {
(
i,
SpkWithHistory {
spk,
txids: HashSet::new(),
},
)
});
let (update, last_active_index) = fetch_txs_with_keychain_spks(
self,
&mut inserted_txs,
keychain_spks,
spks_with_history,
stop_gap,
parallel_requests,
)
Expand Down Expand Up @@ -125,7 +136,7 @@ where
fetch_txs_with_spks(
self,
&mut inserted_txs,
request.iter_spks(),
request.iter_spks_with_history(),
parallel_requests,
)
.await?,
Expand Down Expand Up @@ -279,31 +290,35 @@ async fn chain_update<S: Sleeper>(
async fn fetch_txs_with_keychain_spks<I, S>(
client: &esplora_client::AsyncClient<S>,
inserted_txs: &mut HashSet<Txid>,
mut keychain_spks: I,
mut spks_with_history: I,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxUpdate<ConfirmationBlockTime>, Option<u32>), Error>
where
I: Iterator<Item = Indexed<ScriptBuf>> + Send,
I: Iterator<Item = Indexed<SpkWithHistory>> + Send,
S: Sleeper + Clone + Send + Sync,
{
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);

let mut update = TxUpdate::<ConfirmationBlockTime>::default();
let mut last_index = Option::<u32>::None;
let mut last_active_index = Option::<u32>::None;
let mut spk_txids: HashSet<Txid> = HashSet::new();

loop {
let handles = keychain_spks
let handles = spks_with_history
.by_ref()
.take(parallel_requests)
.map(|(spk_index, spk)| {
.map(|(spk_index, spk_with_history)| {
spk_txids.extend(&spk_with_history.txids);
let client = client.clone();
async move {
let mut last_seen = None;
let mut spk_txs = Vec::new();
loop {
let txs = client.scripthash_txs(&spk, last_seen).await?;
let txs = client
.scripthash_txs(&spk_with_history.spk, last_seen)
.await?;
let tx_count = txs.len();
last_seen = txs.last().map(|tx| tx.txid);
spk_txs.extend(txs);
Expand Down Expand Up @@ -344,6 +359,8 @@ where
}
}

update.missing = spk_txids.difference(&inserted_txs).cloned().collect();

Ok((update, last_active_index))
}

Expand All @@ -358,18 +375,21 @@ where
async fn fetch_txs_with_spks<I, S>(
client: &esplora_client::AsyncClient<S>,
inserted_txs: &mut HashSet<Txid>,
spks: I,
spks_with_history: I,
parallel_requests: usize,
) -> Result<TxUpdate<ConfirmationBlockTime>, Error>
where
I: IntoIterator<Item = ScriptBuf> + Send,
I: IntoIterator<Item = SpkWithHistory> + Send,
I::IntoIter: Send,
S: Sleeper + Clone + Send + Sync,
{
fetch_txs_with_keychain_spks(
client,
inserted_txs,
spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
spks_with_history
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
usize::MAX,
parallel_requests,
)
Expand Down
42 changes: 30 additions & 12 deletions crates/esplora/src/blocking_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use bdk_core::collections::{BTreeMap, BTreeSet, HashSet};
use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse};
use bdk_core::spk_client::{
FullScanRequest, FullScanResponse, SpkWithHistory, SyncRequest, SyncResponse,
};
use bdk_core::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
bitcoin::{BlockHash, OutPoint, Txid},
BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate,
};
use esplora_client::{OutputStatus, Tx};
Expand Down Expand Up @@ -67,10 +69,19 @@ impl EsploraExt for esplora_client::BlockingClient {
let mut last_active_indices = BTreeMap::<K, u32>::new();
for keychain in request.keychains() {
let keychain_spks = request.iter_spks(keychain.clone());
let spks_with_history = keychain_spks.into_iter().map(|(i, spk)| {
(
i,
SpkWithHistory {
spk,
txids: HashSet::new(),
},
)
});
let (update, last_active_index) = fetch_txs_with_keychain_spks(
self,
&mut inserted_txs,
keychain_spks,
spks_with_history,
stop_gap,
parallel_requests,
)?;
Expand Down Expand Up @@ -116,7 +127,7 @@ impl EsploraExt for esplora_client::BlockingClient {
tx_update.extend(fetch_txs_with_spks(
self,
&mut inserted_txs,
request.iter_spks(),
request.iter_spks_with_history(),
parallel_requests,
)?);
tx_update.extend(fetch_txs_with_txids(
Expand Down Expand Up @@ -248,10 +259,10 @@ fn chain_update(
Ok(tip)
}

fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<SpkWithHistory>>>(
client: &esplora_client::BlockingClient,
inserted_txs: &mut HashSet<Txid>,
mut keychain_spks: I,
mut spks_with_history: I,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxUpdate<ConfirmationBlockTime>, Option<u32>), Error> {
Expand All @@ -260,19 +271,21 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
let mut update = TxUpdate::<ConfirmationBlockTime>::default();
let mut last_index = Option::<u32>::None;
let mut last_active_index = Option::<u32>::None;
let mut spk_txids: HashSet<Txid> = HashSet::new();

loop {
let handles = keychain_spks
let handles = spks_with_history
.by_ref()
.take(parallel_requests)
.map(|(spk_index, spk)| {
.map(|(spk_index, spk_with_history)| {
spk_txids.extend(&spk_with_history.txids);
std::thread::spawn({
let client = client.clone();
move || -> Result<TxsOfSpkIndex, Error> {
let mut last_seen = None;
let mut spk_txs = Vec::new();
loop {
let txs = client.scripthash_txs(&spk, last_seen)?;
let txs = client.scripthash_txs(&spk_with_history.spk, last_seen)?;
let tx_count = txs.len();
last_seen = txs.last().map(|tx| tx.txid);
spk_txs.extend(txs);
Expand Down Expand Up @@ -315,6 +328,8 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
}
}

update.missing = spk_txids.difference(&inserted_txs).cloned().collect();

Ok((update, last_active_index))
}

Expand All @@ -326,16 +341,19 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
/// requests to make in parallel.
///
/// Refer to [crate-level docs](crate) for more.
fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
fn fetch_txs_with_spks<I: IntoIterator<Item = SpkWithHistory>>(
client: &esplora_client::BlockingClient,
inserted_txs: &mut HashSet<Txid>,
spks: I,
spks_with_history: I,
parallel_requests: usize,
) -> Result<TxUpdate<ConfirmationBlockTime>, Error> {
fetch_txs_with_keychain_spks(
client,
inserted_txs,
spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
spks_with_history
.into_iter()
.enumerate()
.map(|(i, spk_with_history)| (i as u32, spk_with_history)),
usize::MAX,
parallel_requests,
)
Expand Down

0 comments on commit 7f09b70

Please sign in to comment.