Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wallet / address group endpoints #82

Merged
merged 10 commits into from
Oct 18, 2024
80 changes: 80 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,67 @@ impl<'a> Iterator for ReverseScanIterator<'a> {
}
}

pub struct ReverseScanGroupIterator<'a> {
iters: Vec<ReverseScanIterator<'a>>,
next_rows: Vec<Option<DBRow>>,
value_offset: usize,
done: bool,
}

impl<'a> ReverseScanGroupIterator<'a> {
pub fn new(
mut iters: Vec<ReverseScanIterator<'a>>,
value_offset: usize,
) -> ReverseScanGroupIterator {
let mut next_rows: Vec<Option<DBRow>> = Vec::with_capacity(iters.len());
for iter in &mut iters {
let next = iter.next();
next_rows.push(next);
}
let done = next_rows.iter().all(|row| row.is_none());
ReverseScanGroupIterator {
iters,
next_rows,
value_offset,
done,
}
}
}

impl<'a> Iterator for ReverseScanGroupIterator<'a> {
type Item = DBRow;

fn next(&mut self) -> Option<DBRow> {
if self.done {
return None;
}

let best_index = self
.next_rows
.iter()
.enumerate()
.max_by(|(a_index, a_opt), (b_index, b_opt)| match (a_opt, b_opt) {
(None, None) => a_index.cmp(b_index),

(Some(_), None) => std::cmp::Ordering::Greater,

(None, Some(_)) => std::cmp::Ordering::Less,

(Some(a), Some(b)) => a.key[self.value_offset..].cmp(&(b.key[self.value_offset..])),
})
.map(|(index, _)| index)
.unwrap_or(0);

let best = self.next_rows[best_index].take();
self.next_rows[best_index] = self.iters.get_mut(best_index)?.next();
if self.next_rows.iter().all(|row| row.is_none()) {
self.done = true;
}

best
}
}

#[derive(Debug)]
pub struct DB {
db: rocksdb::DB,
Expand Down Expand Up @@ -140,6 +201,25 @@ impl DB {
}
}

pub fn iter_scan_group_reverse(
&self,
prefixes: impl Iterator<Item = (Vec<u8>, Vec<u8>)>,
value_offset: usize,
) -> ReverseScanGroupIterator {
let iters = prefixes
.map(|(prefix, prefix_max)| {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(prefix_max);
ReverseScanIterator {
prefix: prefix.to_vec(),
iter,
done: false,
}
})
.collect();
ReverseScanGroupIterator::new(iters, value_offset)
}

pub fn write(&self, mut rows: Vec<DBRow>, flush: DBFlush) {
debug!(
"writing {} rows to {:?}, flush={:?}",
Expand Down
43 changes: 43 additions & 0 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,49 @@ impl Mempool {
.collect()
}

pub fn history_group(
&self,
scripthashes: &[[u8; 32]],
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<Transaction> {
let _timer = self
.latency
.with_label_values(&["history_group"])
.start_timer();
scripthashes
.iter()
.filter_map(|scripthash| self.history.get(&scripthash[..]))
.flat_map(|entries| entries.iter())
.map(|e| e.get_txid())
.unique()
// TODO seek directly to last seen tx without reading earlier rows
.skip_while(|txid| {
// skip until we reach the last_seen_txid
last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid)
})
.skip(match last_seen_txid {
Some(_) => 1, // skip the last_seen_txid itself
None => 0,
})
.take(limit)
.map(|txid| self.txstore.get(&txid).expect("missing mempool tx"))
.cloned()
.collect()
}

pub fn history_txids_iter_group<'a>(
&'a self,
scripthashes: &'a [[u8; 32]],
) -> impl Iterator<Item = Txid> + 'a {
scripthashes
.iter()
.filter_map(move |scripthash| self.history.get(&scripthash[..]))
.flat_map(|entries| entries.iter())
.map(|entry| entry.get_txid())
.unique()
}

pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<Txid> {
let _timer = self
.latency
Expand Down
145 changes: 127 additions & 18 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use crate::new_index::fetch::{start_fetcher, BlockEntry, FetchFrom};
#[cfg(feature = "liquid")]
use crate::elements::{asset, peg};

use super::db::ReverseScanGroupIterator;

const MIN_HISTORY_ITEMS_TO_CACHE: usize = 100;

pub struct Store {
Expand Down Expand Up @@ -511,28 +513,34 @@ impl ChainQuery {
&TxHistoryRow::prefix_end(code, hash),
)
}

pub fn summary(
fn history_iter_scan_group_reverse(
&self,
scripthash: &[u8],
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
// scripthash lookup
self._summary(b'H', scripthash, last_seen_txid, limit)
code: u8,
hashes: &[[u8; 32]],
start_height: Option<u32>,
) -> ReverseScanGroupIterator {
self.store.history_db.iter_scan_group_reverse(
hashes.iter().map(|hash| {
let prefix = TxHistoryRow::filter(code, &hash[..]);
let prefix_max = start_height
.map_or(TxHistoryRow::prefix_end(code, &hash[..]), |start_height| {
TxHistoryRow::prefix_height_end(code, &hash[..], start_height)
});
(prefix, prefix_max)
}),
33,
)
}

fn _summary(
fn collate_summaries(
&self,
code: u8,
hash: &[u8],
iter: impl Iterator<Item = TxHistoryRow>,
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
let _timer_scan = self.start_timer("address_summary");
let rows = self
.history_iter_scan_reverse(code, hash)
.map(TxHistoryRow::from_row)
// collate utxo funding/spending events by transaction

let rows = iter
.map(|row| (row.get_txid(), row.key.txinfo, row.key.tx_position))
.skip_while(|(txid, _, _)| {
// skip until we reach the last_seen_txid
Expand All @@ -546,8 +554,6 @@ impl ChainQuery {
self.tx_confirming_block(&txid)
.map(|b| (txid, info, b.height, b.time, tx_position))
});

// collate utxo funding/spending events by transaction
let mut map: HashMap<Txid, TxHistorySummary> = HashMap::new();
for (txid, info, height, time, tx_position) in rows {
if !map.contains_key(&txid) && map.len() == limit {
Expand Down Expand Up @@ -606,7 +612,6 @@ impl ChainQuery {
_ => {}
}
}

let mut tx_summaries = map.into_values().collect::<Vec<TxHistorySummary>>();
tx_summaries.sort_by(|a, b| {
if a.height == b.height {
Expand All @@ -622,6 +627,47 @@ impl ChainQuery {
tx_summaries
}

pub fn summary(
&self,
scripthash: &[u8],
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
// scripthash lookup
self._summary(b'H', scripthash, last_seen_txid, limit)
}

fn _summary(
&self,
code: u8,
hash: &[u8],
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
let _timer_scan = self.start_timer("address_summary");
let rows = self
.history_iter_scan_reverse(code, hash)
.map(TxHistoryRow::from_row);

self.collate_summaries(rows, last_seen_txid, limit)
}

pub fn summary_group(
&self,
scripthashes: &[[u8; 32]],
last_seen_txid: Option<&Txid>,
start_height: Option<u32>,
limit: usize,
) -> Vec<TxHistorySummary> {
// scripthash lookup
let _timer_scan = self.start_timer("address_group_summary");
let rows = self
.history_iter_scan_group_reverse(b'H', scripthashes, start_height)
.map(TxHistoryRow::from_row);

self.collate_summaries(rows, last_seen_txid, limit)
}

pub fn history(
&self,
scripthash: &[u8],
Expand Down Expand Up @@ -687,6 +733,63 @@ impl ChainQuery {
.collect()
}

pub fn history_group(
&self,
scripthashes: &[[u8; 32]],
last_seen_txid: Option<&Txid>,
start_height: Option<u32>,
limit: usize,
) -> Vec<(Transaction, BlockId)> {
// scripthash lookup
self._history_group(b'H', scripthashes, last_seen_txid, start_height, limit)
}

pub fn history_txids_iter_group(
&self,
scripthashes: &[[u8; 32]],
start_height: Option<u32>,
) -> impl Iterator<Item = Txid> + '_ {
self.history_iter_scan_group_reverse(b'H', scripthashes, start_height)
.map(|row| TxHistoryRow::from_row(row).get_txid())
.unique()
}

fn _history_group(
&self,
code: u8,
hashes: &[[u8; 32]],
last_seen_txid: Option<&Txid>,
start_height: Option<u32>,
limit: usize,
) -> Vec<(Transaction, BlockId)> {
print!("limit {} | last_seen {:?}", limit, last_seen_txid);
let _timer_scan = self.start_timer("history_group");
let txs_conf = self
.history_iter_scan_group_reverse(code, hashes, start_height)
.map(|row| TxHistoryRow::from_row(row).get_txid())
// XXX: unique() requires keeping an in-memory list of all txids, can we avoid that?
.unique()
.skip_while(|txid| {
// we already seeked to the last txid at this height
// now skip just past the last_seen_txid itself
last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid)
})
.skip(match last_seen_txid {
Some(_) => 1, // skip the last_seen_txid itself
None => 0,
})
.filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b)))
.take(limit)
.collect::<Vec<(Txid, BlockId)>>();

self.lookup_txns(&txs_conf)
.expect("failed looking up txs in history index")
.into_iter()
.zip(txs_conf)
.map(|(tx, (_, blockid))| (tx, blockid))
.collect()
}

// TODO: avoid duplication with stats/stats_delta?
pub fn utxo(&self, scripthash: &[u8], limit: usize, flush: DBFlush) -> Result<Vec<Utxo>> {
let _timer = self.start_timer("utxo");
Expand Down Expand Up @@ -1661,6 +1764,12 @@ impl TxHistoryRow {
bincode_util::serialize_big(&(code, full_hash(hash), height)).unwrap()
}

// prefix representing the end of a given block (used for reverse scans)
fn prefix_height_end(code: u8, hash: &[u8], height: u32) -> Bytes {
// u16::MAX for the tx_position ensures we get all transactions at this height
bincode_util::serialize_big(&(code, full_hash(hash), height, u16::MAX)).unwrap()
junderw marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn into_row(self) -> DBRow {
DBRow {
key: bincode_util::serialize_big(&self.key).unwrap(),
Expand Down
Loading
Loading