From 74444677f2e3c9ee4bdc1a412812a6ac501cf54a Mon Sep 17 00:00:00 2001 From: Mononaut Date: Thu, 28 Mar 2024 08:47:36 +0000 Subject: [PATCH 1/9] Add wallet / address group endpoints --- src/new_index/db.rs | 83 ++++++++++++++++++++++++ src/new_index/mempool.rs | 43 +++++++++++++ src/new_index/schema.rs | 134 +++++++++++++++++++++++++++++++++------ src/rest.rs | 120 +++++++++++++++++++++++++++++++++++ 4 files changed, 362 insertions(+), 18 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 5e4b37a1..5229f8e4 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -69,6 +69,69 @@ impl<'a> Iterator for ReverseScanIterator<'a> { } } +pub struct ReverseScanGroupIterator<'a> { + iters: Vec>, + next_rows: Vec>, + value_offset: usize, + done: bool, +} + +impl<'a> ReverseScanGroupIterator<'a> { + pub fn new( + iters: Vec>, + value_offset: usize, + ) -> ReverseScanGroupIterator { + let mut next_rows: Vec> = Vec::new(); + let mut new_iters: Vec> = Vec::new(); + for mut iter in iters { + let next = iter.next(); + next_rows.push(next); + new_iters.push(iter); + } + let done = next_rows.iter().all(|row| row.is_none()); + ReverseScanGroupIterator { + iters: new_iters, + next_rows, + value_offset, + done, + } + } +} + +impl<'a> Iterator for ReverseScanGroupIterator<'a> { + type Item = DBRow; + + fn next(&mut self) -> Option { + if self.done { + return None; + } + + let best_index = self + .next_rows + .iter() + .enumerate() + .max_by(|(a_index, a_opt), (b_index, b_opt)| match (a_opt, b_opt) { + (None, None) => a_index.cmp(b_index), + + (Some(_), None) => std::cmp::Ordering::Greater, + + (None, Some(_)) => std::cmp::Ordering::Less, + + (Some(a), Some(b)) => a.key[self.value_offset..].cmp(&(b.key[self.value_offset..])), + }) + .map(|(index, _)| index) + .unwrap_or(0); + + let best = self.next_rows[best_index].take(); + self.next_rows[best_index] = self.iters.get_mut(best_index)?.next(); + if self.next_rows.iter().all(|row| row.is_none()) { + self.done = true; + } + + best + } +} + #[derive(Debug)] pub struct DB { db: rocksdb::DB, @@ -136,6 +199,26 @@ impl DB { } } + pub fn iter_scan_group_reverse( + &self, + prefixes: Vec<(Vec, Vec)>, + value_offset: usize, + ) -> ReverseScanGroupIterator { + let iters = prefixes + .iter() + .map(|(prefix, prefix_max)| { + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(prefix_max); + ReverseScanIterator { + prefix: prefix.to_vec(), + iter, + done: false, + } + }) + .collect(); + ReverseScanGroupIterator::new(iters, value_offset) + } + pub fn write(&self, mut rows: Vec, flush: DBFlush) { debug!( "writing {} rows to {:?}, flush={:?}", diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index c3841d52..64adbd9b 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -177,6 +177,49 @@ impl Mempool { .collect() } + pub fn history_group( + &self, + scripthashes: Vec<[u8; 32]>, + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + let _timer = self + .latency + .with_label_values(&["history_group"]) + .start_timer(); + scripthashes + .into_iter() + .filter_map(|scripthash| self.history.get(&scripthash[..])) + .flat_map(|entries| entries.iter()) + .map(|e| e.get_txid()) + .unique() + // TODO seek directly to last seen tx without reading earlier rows + .skip_while(|txid| { + // skip until we reach the last_seen_txid + last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) + }) + .skip(match last_seen_txid { + Some(_) => 1, // skip the last_seen_txid itself + None => 0, + }) + .take(limit) + .map(|txid| self.txstore.get(&txid).expect("missing mempool tx")) + .cloned() + .collect() + } + + pub fn history_txids_iter_group( + &self, + scripthashes: Vec<[u8; 32]>, + ) -> impl Iterator + '_ { + scripthashes + .into_iter() + .filter_map(move |scripthash| self.history.get(&scripthash[..])) + .flat_map(|entries| entries.iter()) + .map(|entry| entry.get_txid()) + .unique() + } + pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec { let _timer = self .latency diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 00ee3e89..97da52df 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -37,6 +37,8 @@ use crate::new_index::fetch::{start_fetcher, BlockEntry, FetchFrom}; #[cfg(feature = "liquid")] use crate::elements::{asset, peg}; +use super::db::ReverseScanGroupIterator; + const MIN_HISTORY_ITEMS_TO_CACHE: usize = 100; pub struct Store { @@ -511,28 +513,33 @@ impl ChainQuery { &TxHistoryRow::prefix_end(code, hash), ) } - - pub fn summary( + fn history_iter_scan_group_reverse( &self, - scripthash: &[u8], - last_seen_txid: Option<&Txid>, - limit: usize, - ) -> Vec { - // scripthash lookup - self._summary(b'H', scripthash, last_seen_txid, limit) + code: u8, + hashes: Vec<[u8; 32]>, + ) -> ReverseScanGroupIterator { + self.store.history_db.iter_scan_group_reverse( + hashes + .into_iter() + .map(|hash| { + let prefix = TxHistoryRow::filter(code, &hash[..]); + let prefix_max = TxHistoryRow::prefix_end(code, &hash[..]); + (prefix, prefix_max) + }) + .collect(), + 33, + ) } - fn _summary( + fn collate_summaries( &self, - code: u8, - hash: &[u8], + iter: impl Iterator, last_seen_txid: Option<&Txid>, limit: usize, ) -> Vec { - let _timer_scan = self.start_timer("address_summary"); - let rows = self - .history_iter_scan_reverse(code, hash) - .map(TxHistoryRow::from_row) + // collate utxo funding/spending events by transaction + + let rows = iter .map(|row| (row.get_txid(), row.key.txinfo)) .skip_while(|(txid, _)| { // skip until we reach the last_seen_txid @@ -546,8 +553,6 @@ impl ChainQuery { self.tx_confirming_block(&txid) .map(|b| (txid, info, b.height, b.time)) }); - - // collate utxo funding/spending events by transaction let mut map: HashMap = HashMap::new(); for (txid, info, height, time) in rows { if !map.contains_key(&txid) && map.len() == limit { @@ -602,7 +607,6 @@ impl ChainQuery { _ => {} } } - let mut tx_summaries = map.into_values().collect::>(); tx_summaries.sort_by(|a, b| { if a.height == b.height { @@ -614,6 +618,46 @@ impl ChainQuery { tx_summaries } + pub fn summary( + &self, + scripthash: &[u8], + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + // scripthash lookup + self._summary(b'H', scripthash, last_seen_txid, limit) + } + + fn _summary( + &self, + code: u8, + hash: &[u8], + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + let _timer_scan = self.start_timer("address_summary"); + let rows = self + .history_iter_scan_reverse(code, hash) + .map(TxHistoryRow::from_row); + + self.collate_summaries(rows, last_seen_txid, limit) + } + + pub fn summary_group( + &self, + scripthashes: Vec<[u8; 32]>, + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + // scripthash lookup + let _timer_scan = self.start_timer("address_group_summary"); + let rows = self + .history_iter_scan_group_reverse(b'H', scripthashes) + .map(TxHistoryRow::from_row); + + self.collate_summaries(rows, last_seen_txid, limit) + } + pub fn history( &self, scripthash: &[u8], @@ -679,6 +723,60 @@ impl ChainQuery { .collect() } + pub fn history_group( + &self, + scripthashes: Vec<[u8; 32]>, + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec<(Transaction, BlockId)> { + // scripthash lookup + self._history_group(b'H', scripthashes, last_seen_txid, limit) + } + + pub fn history_txids_iter_group( + &self, + scripthashes: Vec<[u8; 32]>, + ) -> impl Iterator + '_ { + self.history_iter_scan_group_reverse(b'H', scripthashes) + .map(|row| TxHistoryRow::from_row(row).get_txid()) + .unique() + } + + fn _history_group( + &self, + code: u8, + hashes: Vec<[u8; 32]>, + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec<(Transaction, BlockId)> { + print!("limit {} | last_seen {:?}", limit, last_seen_txid); + let _timer_scan = self.start_timer("history_group"); + let txs_conf = self + .history_iter_scan_group_reverse(code, hashes) + .map(|row| TxHistoryRow::from_row(row).get_txid()) + // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? + .unique() + // TODO seek directly to last seen tx without reading earlier rows + .skip_while(|txid| { + // skip until we reach the last_seen_txid + last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) + }) + .skip(match last_seen_txid { + Some(_) => 1, // skip the last_seen_txid itself + None => 0, + }) + .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) + .take(limit) + .collect::>(); + + self.lookup_txns(&txs_conf) + .expect("failed looking up txs in history index") + .into_iter() + .zip(txs_conf) + .map(|(tx, (_, blockid))| (tx, blockid)) + .collect() + } + // TODO: avoid duplication with stats/stats_delta? pub fn utxo(&self, scripthash: &[u8], limit: usize, flush: DBFlush) -> Result> { let _timer = self.start_timer("utxo"); diff --git a/src/rest.rs b/src/rest.rs index df085441..49900b36 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -925,6 +925,80 @@ fn handle_request( json_response(prepare_txs(txs, query, config), TTL_SHORT) } + (&Method::GET, Some(script_types @ &"addresses"), Some(&"txs"), None, None, None) + | (&Method::GET, Some(script_types @ &"scripthashes"), Some(&"txs"), None, None, None) => { + let script_type = match *script_types { + "addresses" => "address", + "scripthashes" => "scripthash", + _ => "", + }; + let script_hashes: Vec<[u8; 32]> = query_params + .get(&script_types.to_string()) + .ok_or(HttpError::from(format!("No {} specified", script_types)))? + .as_str() + .split(',') + .map(|script_str| to_scripthash(script_type, script_str, config.network_type)) + .filter_map(|s| s.ok()) + .collect(); + + let max_txs = query_params + .get("max_txs") + .and_then(|s| s.parse::().ok()) + .unwrap_or(config.rest_default_max_mempool_txs); + let after_txid = query_params + .get("after_txid") + .and_then(|s| s.parse::().ok()); + + let mut txs = vec![]; + + if let Some(given_txid) = &after_txid { + let is_mempool = query + .mempool() + .history_txids_iter_group(script_hashes.clone()) + .any(|txid| given_txid == &txid); + let is_confirmed = if is_mempool { + false + } else { + query + .chain() + .history_txids_iter_group(script_hashes.clone()) + .any(|txid| given_txid == &txid) + }; + if !is_mempool && !is_confirmed { + return Err(HttpError( + StatusCode::UNPROCESSABLE_ENTITY, + String::from("after_txid not found"), + )); + } + } + txs.extend( + query + .mempool() + .history_group(script_hashes.clone(), after_txid.as_ref(), max_txs) + .into_iter() + .map(|tx| (tx, None)), + ); + + if txs.len() < max_txs { + let after_txid_ref = if !txs.is_empty() { + // If there are any txs, we know mempool found the + // after_txid IF it exists... so always return None. + None + } else { + after_txid.as_ref() + }; + txs.extend( + query + .chain() + .history_group(script_hashes, after_txid_ref, max_txs - txs.len()) + .into_iter() + .map(|(tx, blockid)| (tx, Some(blockid))), + ); + } + + json_response(prepare_txs(txs, query, config), TTL_SHORT) + } + ( &Method::GET, Some(script_type @ &"address"), @@ -989,6 +1063,52 @@ fn handle_request( json_response(summary, TTL_SHORT) } + ( + &Method::GET, + Some(script_types @ &"addresses"), + Some(&"txs"), + Some(&"summary"), + last_seen_txid, + None, + ) + | ( + &Method::GET, + Some(script_types @ &"scripthashes"), + Some(&"txs"), + Some(&"summary"), + last_seen_txid, + None, + ) => { + let script_type = match *script_types { + "addresses" => "address", + "scripthashes" => "scripthash", + _ => "", + }; + let script_hashes: Vec<[u8; 32]> = query_params + .get(&script_types.to_string()) + .ok_or(HttpError::from(format!("No {} specified", script_types)))? + .as_str() + .split(',') + .map(|script_str| to_scripthash(script_type, script_str, config.network_type)) + .filter_map(|s| s.ok()) + .collect(); + + let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok()); + let max_txs = cmp::min( + config.rest_default_max_address_summary_txs, + query_params + .get("max_txs") + .and_then(|s| s.parse::().ok()) + .unwrap_or(config.rest_default_max_address_summary_txs), + ); + + let summary = + query + .chain() + .summary_group(script_hashes, last_seen_txid.as_ref(), max_txs); + + json_response(summary, TTL_SHORT) + } ( &Method::GET, Some(script_type @ &"address"), From 99538c2bb4ad4d51bc8af69bd14fa1b9928ed811 Mon Sep 17 00:00:00 2001 From: junderw Date: Sat, 28 Sep 2024 18:11:47 +0900 Subject: [PATCH 2/9] Remove some extra allocations --- src/new_index/db.rs | 13 +++++-------- src/new_index/schema.rs | 21 +++++++++------------ 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 94a00137..98de2d30 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -82,19 +82,17 @@ pub struct ReverseScanGroupIterator<'a> { impl<'a> ReverseScanGroupIterator<'a> { pub fn new( - iters: Vec>, + mut iters: Vec>, value_offset: usize, ) -> ReverseScanGroupIterator { - let mut next_rows: Vec> = Vec::new(); - let mut new_iters: Vec> = Vec::new(); - for mut iter in iters { + let mut next_rows: Vec> = Vec::with_capacity(iters.len()); + for iter in &mut iters { let next = iter.next(); next_rows.push(next); - new_iters.push(iter); } let done = next_rows.iter().all(|row| row.is_none()); ReverseScanGroupIterator { - iters: new_iters, + iters, next_rows, value_offset, done, @@ -205,11 +203,10 @@ impl DB { pub fn iter_scan_group_reverse( &self, - prefixes: Vec<(Vec, Vec)>, + prefixes: impl Iterator, Vec)>, value_offset: usize, ) -> ReverseScanGroupIterator { let iters = prefixes - .iter() .map(|(prefix, prefix_max)| { let mut iter = self.db.raw_iterator(); iter.seek_for_prev(prefix_max); diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 91131a2a..fed2c1e1 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -516,17 +516,14 @@ impl ChainQuery { fn history_iter_scan_group_reverse( &self, code: u8, - hashes: Vec<[u8; 32]>, + hashes: &[[u8; 32]], ) -> ReverseScanGroupIterator { self.store.history_db.iter_scan_group_reverse( - hashes - .into_iter() - .map(|hash| { - let prefix = TxHistoryRow::filter(code, &hash[..]); - let prefix_max = TxHistoryRow::prefix_end(code, &hash[..]); - (prefix, prefix_max) - }) - .collect(), + hashes.iter().map(|hash| { + let prefix = TxHistoryRow::filter(code, &hash[..]); + let prefix_max = TxHistoryRow::prefix_end(code, &hash[..]); + (prefix, prefix_max) + }), 33, ) } @@ -660,7 +657,7 @@ impl ChainQuery { // scripthash lookup let _timer_scan = self.start_timer("address_group_summary"); let rows = self - .history_iter_scan_group_reverse(b'H', scripthashes) + .history_iter_scan_group_reverse(b'H', &scripthashes) .map(TxHistoryRow::from_row); self.collate_summaries(rows, last_seen_txid, limit) @@ -745,7 +742,7 @@ impl ChainQuery { &self, scripthashes: Vec<[u8; 32]>, ) -> impl Iterator + '_ { - self.history_iter_scan_group_reverse(b'H', scripthashes) + self.history_iter_scan_group_reverse(b'H', &scripthashes) .map(|row| TxHistoryRow::from_row(row).get_txid()) .unique() } @@ -760,7 +757,7 @@ impl ChainQuery { print!("limit {} | last_seen {:?}", limit, last_seen_txid); let _timer_scan = self.start_timer("history_group"); let txs_conf = self - .history_iter_scan_group_reverse(code, hashes) + .history_iter_scan_group_reverse(code, &hashes) .map(|row| TxHistoryRow::from_row(row).get_txid()) // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? .unique() From 12599e7bf1fb7fa0c90895897c7db3bc019e7bda Mon Sep 17 00:00:00 2001 From: Mononaut Date: Sun, 6 Oct 2024 22:39:20 +0000 Subject: [PATCH 3/9] use POST for multi-address APIs --- src/rest.rs | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/rest.rs b/src/rest.rs index f6e790c4..a7666754 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -936,20 +936,20 @@ fn handle_request( json_response(prepare_txs(txs, query, config), TTL_SHORT) } - (&Method::GET, Some(script_types @ &"addresses"), Some(&"txs"), None, None, None) - | (&Method::GET, Some(script_types @ &"scripthashes"), Some(&"txs"), None, None, None) => { + (&Method::POST, Some(script_types @ &"addresses"), Some(&"txs"), None, None, None) + | (&Method::POST, Some(script_types @ &"scripthashes"), Some(&"txs"), None, None, None) => { let script_type = match *script_types { "addresses" => "address", "scripthashes" => "scripthash", _ => "", }; - let script_hashes: Vec<[u8; 32]> = query_params - .get(*script_types) - .ok_or(HttpError::from(format!("No {} specified", script_types)))? - .as_str() - .split(',') - .map(|script_str| to_scripthash(script_type, script_str, config.network_type)) - .filter_map(|s| s.ok()) + + let script_hashes: Vec<[u8; 32]> = serde_json::from_slice::>(&body) + .map_err(|err| HttpError::from(err.to_string()))? + .iter() + .filter_map(|script_str| { + to_scripthash(script_type, script_str, config.network_type).ok() + }) .collect(); let max_txs = query_params @@ -1083,7 +1083,7 @@ fn handle_request( None, ) | ( - &Method::GET, + &Method::POST, Some(script_types @ &"scripthashes"), Some(&"txs"), Some(&"summary"), @@ -1095,13 +1095,12 @@ fn handle_request( "scripthashes" => "scripthash", _ => "", }; - let script_hashes: Vec<[u8; 32]> = query_params - .get(*script_types) - .ok_or(HttpError::from(format!("No {} specified", script_types)))? - .as_str() - .split(',') - .map(|script_str| to_scripthash(script_type, script_str, config.network_type)) - .filter_map(|s| s.ok()) + let script_hashes: Vec<[u8; 32]> = serde_json::from_slice::>(&body) + .map_err(|err| HttpError::from(err.to_string()))? + .iter() + .filter_map(|script_str| { + to_scripthash(script_type, script_str, config.network_type).ok() + }) .collect(); let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok()); From 04957067d2151952ea94d24641941fac0445295b Mon Sep 17 00:00:00 2001 From: junderw Date: Mon, 7 Oct 2024 19:02:41 +0900 Subject: [PATCH 4/9] Remove clones and limit max address count --- src/new_index/mempool.rs | 14 ++++----- src/new_index/schema.rs | 14 ++++----- src/rest.rs | 62 ++++++++++++++++++++++++++++++++++------ 3 files changed, 67 insertions(+), 23 deletions(-) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index c60e4894..32d27c1f 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -179,7 +179,7 @@ impl Mempool { pub fn history_group( &self, - scripthashes: Vec<[u8; 32]>, + scripthashes: &[[u8; 32]], last_seen_txid: Option<&Txid>, limit: usize, ) -> Vec { @@ -188,7 +188,7 @@ impl Mempool { .with_label_values(&["history_group"]) .start_timer(); scripthashes - .into_iter() + .iter() .filter_map(|scripthash| self.history.get(&scripthash[..])) .flat_map(|entries| entries.iter()) .map(|e| e.get_txid()) @@ -208,12 +208,12 @@ impl Mempool { .collect() } - pub fn history_txids_iter_group( - &self, - scripthashes: Vec<[u8; 32]>, - ) -> impl Iterator + '_ { + pub fn history_txids_iter_group<'a>( + &'a self, + scripthashes: &'a [[u8; 32]], + ) -> impl Iterator + 'a { scripthashes - .into_iter() + .iter() .filter_map(move |scripthash| self.history.get(&scripthash[..])) .flat_map(|entries| entries.iter()) .map(|entry| entry.get_txid()) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index fed2c1e1..566bc6b8 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -650,14 +650,14 @@ impl ChainQuery { pub fn summary_group( &self, - scripthashes: Vec<[u8; 32]>, + scripthashes: &[[u8; 32]], last_seen_txid: Option<&Txid>, limit: usize, ) -> Vec { // scripthash lookup let _timer_scan = self.start_timer("address_group_summary"); let rows = self - .history_iter_scan_group_reverse(b'H', &scripthashes) + .history_iter_scan_group_reverse(b'H', scripthashes) .map(TxHistoryRow::from_row); self.collate_summaries(rows, last_seen_txid, limit) @@ -730,7 +730,7 @@ impl ChainQuery { pub fn history_group( &self, - scripthashes: Vec<[u8; 32]>, + scripthashes: &[[u8; 32]], last_seen_txid: Option<&Txid>, limit: usize, ) -> Vec<(Transaction, BlockId)> { @@ -740,9 +740,9 @@ impl ChainQuery { pub fn history_txids_iter_group( &self, - scripthashes: Vec<[u8; 32]>, + scripthashes: &[[u8; 32]], ) -> impl Iterator + '_ { - self.history_iter_scan_group_reverse(b'H', &scripthashes) + self.history_iter_scan_group_reverse(b'H', scripthashes) .map(|row| TxHistoryRow::from_row(row).get_txid()) .unique() } @@ -750,14 +750,14 @@ impl ChainQuery { fn _history_group( &self, code: u8, - hashes: Vec<[u8; 32]>, + hashes: &[[u8; 32]], last_seen_txid: Option<&Txid>, limit: usize, ) -> Vec<(Transaction, BlockId)> { print!("limit {} | last_seen {:?}", limit, last_seen_txid); let _timer_scan = self.start_timer("history_group"); let txs_conf = self - .history_iter_scan_group_reverse(code, &hashes) + .history_iter_scan_group_reverse(code, hashes) .map(|row| TxHistoryRow::from_row(row).get_txid()) // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? .unique() diff --git a/src/rest.rs b/src/rest.rs index a7666754..cc2cca01 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -42,6 +42,8 @@ use std::thread; use url::form_urlencoded; const ADDRESS_SEARCH_LIMIT: usize = 10; +// Limit to 300 addresses +const MULTI_ADDRESS_LIMIT: usize = 300; #[cfg(feature = "liquid")] const ASSETS_PER_PAGE: usize = 25; @@ -944,8 +946,24 @@ fn handle_request( _ => "", }; - let script_hashes: Vec<[u8; 32]> = serde_json::from_slice::>(&body) - .map_err(|err| HttpError::from(err.to_string()))? + if multi_address_too_long(&body) { + return Err(HttpError( + StatusCode::UNPROCESSABLE_ENTITY, + String::from("body too long"), + )); + } + + let script_hashes: Vec = + serde_json::from_slice(&body).map_err(|err| HttpError::from(err.to_string()))?; + + if script_hashes.len() > MULTI_ADDRESS_LIMIT { + return Err(HttpError( + StatusCode::UNPROCESSABLE_ENTITY, + String::from("body too long"), + )); + } + + let script_hashes: Vec<[u8; 32]> = script_hashes .iter() .filter_map(|script_str| { to_scripthash(script_type, script_str, config.network_type).ok() @@ -965,14 +983,14 @@ fn handle_request( if let Some(given_txid) = &after_txid { let is_mempool = query .mempool() - .history_txids_iter_group(script_hashes.clone()) + .history_txids_iter_group(&script_hashes) .any(|txid| given_txid == &txid); let is_confirmed = if is_mempool { false } else { query .chain() - .history_txids_iter_group(script_hashes.clone()) + .history_txids_iter_group(&script_hashes) .any(|txid| given_txid == &txid) }; if !is_mempool && !is_confirmed { @@ -985,7 +1003,7 @@ fn handle_request( txs.extend( query .mempool() - .history_group(script_hashes.clone(), after_txid.as_ref(), max_txs) + .history_group(&script_hashes, after_txid.as_ref(), max_txs) .into_iter() .map(|tx| (tx, None)), ); @@ -1001,7 +1019,7 @@ fn handle_request( txs.extend( query .chain() - .history_group(script_hashes, after_txid_ref, max_txs - txs.len()) + .history_group(&script_hashes, after_txid_ref, max_txs - txs.len()) .into_iter() .map(|(tx, blockid)| (tx, Some(blockid))), ); @@ -1095,8 +1113,25 @@ fn handle_request( "scripthashes" => "scripthash", _ => "", }; - let script_hashes: Vec<[u8; 32]> = serde_json::from_slice::>(&body) - .map_err(|err| HttpError::from(err.to_string()))? + + if multi_address_too_long(&body) { + return Err(HttpError( + StatusCode::UNPROCESSABLE_ENTITY, + String::from("body too long"), + )); + } + + let script_hashes: Vec = + serde_json::from_slice(&body).map_err(|err| HttpError::from(err.to_string()))?; + + if script_hashes.len() > MULTI_ADDRESS_LIMIT { + return Err(HttpError( + StatusCode::UNPROCESSABLE_ENTITY, + String::from("body too long"), + )); + } + + let script_hashes: Vec<[u8; 32]> = script_hashes .iter() .filter_map(|script_str| { to_scripthash(script_type, script_str, config.network_type).ok() @@ -1115,7 +1150,7 @@ fn handle_request( let summary = query .chain() - .summary_group(script_hashes, last_seen_txid.as_ref(), max_txs); + .summary_group(&script_hashes, last_seen_txid.as_ref(), max_txs); json_response(summary, TTL_SHORT) } @@ -1823,6 +1858,15 @@ fn parse_scripthash(scripthash: &str) -> Result { } } +#[inline] +fn multi_address_too_long(body: &hyper::body::Bytes) -> bool { + // ("",) (3) (quotes and comma between each entry) + // (\n ) (5) (allows for pretty printed JSON with 4 space indent) + // The opening [] and whatnot don't need to be accounted for, we give more than enough leeway + // p2tr and p2wsh are 55 length, scripthashes are 64. + body.len() > (8 + 64) * MULTI_ADDRESS_LIMIT +} + #[derive(Debug)] struct HttpError(StatusCode, String); From 55ac4ecd1e6776d851d55c97d18e35c11f659fe1 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Mon, 7 Oct 2024 21:50:53 +0000 Subject: [PATCH 5/9] optimize multi-address tx history pagination --- src/new_index/schema.rs | 28 ++++++++--- src/rest.rs | 100 +++++++++++++++++++++++++++++----------- 2 files changed, 95 insertions(+), 33 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 566bc6b8..10ac88c4 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -517,11 +517,15 @@ impl ChainQuery { &self, code: u8, hashes: &[[u8; 32]], + start_height: Option, ) -> ReverseScanGroupIterator { self.store.history_db.iter_scan_group_reverse( hashes.iter().map(|hash| { let prefix = TxHistoryRow::filter(code, &hash[..]); - let prefix_max = TxHistoryRow::prefix_end(code, &hash[..]); + let prefix_max = start_height + .map_or(TxHistoryRow::prefix_end(code, &hash[..]), |start_height| { + TxHistoryRow::prefix_height_end(code, &hash[..], start_height) + }); (prefix, prefix_max) }), 33, @@ -652,12 +656,13 @@ impl ChainQuery { &self, scripthashes: &[[u8; 32]], last_seen_txid: Option<&Txid>, + start_height: Option, limit: usize, ) -> Vec { // scripthash lookup let _timer_scan = self.start_timer("address_group_summary"); let rows = self - .history_iter_scan_group_reverse(b'H', scripthashes) + .history_iter_scan_group_reverse(b'H', scripthashes, start_height) .map(TxHistoryRow::from_row); self.collate_summaries(rows, last_seen_txid, limit) @@ -732,17 +737,19 @@ impl ChainQuery { &self, scripthashes: &[[u8; 32]], last_seen_txid: Option<&Txid>, + start_height: Option, limit: usize, ) -> Vec<(Transaction, BlockId)> { // scripthash lookup - self._history_group(b'H', scripthashes, last_seen_txid, limit) + self._history_group(b'H', scripthashes, last_seen_txid, start_height, limit) } pub fn history_txids_iter_group( &self, scripthashes: &[[u8; 32]], + start_height: Option, ) -> impl Iterator + '_ { - self.history_iter_scan_group_reverse(b'H', scripthashes) + self.history_iter_scan_group_reverse(b'H', scripthashes, start_height) .map(|row| TxHistoryRow::from_row(row).get_txid()) .unique() } @@ -752,18 +759,19 @@ impl ChainQuery { code: u8, hashes: &[[u8; 32]], last_seen_txid: Option<&Txid>, + start_height: Option, limit: usize, ) -> Vec<(Transaction, BlockId)> { print!("limit {} | last_seen {:?}", limit, last_seen_txid); let _timer_scan = self.start_timer("history_group"); let txs_conf = self - .history_iter_scan_group_reverse(code, hashes) + .history_iter_scan_group_reverse(code, hashes, start_height) .map(|row| TxHistoryRow::from_row(row).get_txid()) // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? .unique() - // TODO seek directly to last seen tx without reading earlier rows .skip_while(|txid| { - // skip until we reach the last_seen_txid + // we already seeked to the last txid at this height + // now skip just past the last_seen_txid itself last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) }) .skip(match last_seen_txid { @@ -1756,6 +1764,12 @@ impl TxHistoryRow { bincode_util::serialize_big(&(code, full_hash(hash), height)).unwrap() } + // prefix representing the end of a given block (used for reverse scans) + fn prefix_height_end(code: u8, hash: &[u8], height: u32) -> Bytes { + // u16::MAX for the tx_position ensures we get all transactions at this height + bincode_util::serialize_big(&(code, full_hash(hash), height, u16::MAX)).unwrap() + } + pub fn into_row(self) -> DBRow { DBRow { key: bincode_util::serialize_big(&self.key).unwrap(), diff --git a/src/rest.rs b/src/rest.rs index cc2cca01..25e8861d 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -541,6 +541,27 @@ fn ttl_by_depth(height: Option, query: &Query) -> u32 { }) } +enum TxidLocation { + Mempool, + Chain(u32), // contains height + None, +} + +#[inline] +fn find_txid( + txid: &Txid, + mempool: &crate::new_index::Mempool, + chain: &crate::new_index::ChainQuery, +) -> TxidLocation { + if mempool.lookup_txn(txid).is_some() { + TxidLocation::Mempool + } else if let Some(block) = chain.tx_confirming_block(txid) { + TxidLocation::Chain(block.height as u32) + } else { + TxidLocation::None + } +} + /// Prepare transactions to be serialized in a JSON response /// /// Any transactions with missing prevouts will be filtered out of the response, rather than returned with incorrect data. @@ -980,33 +1001,33 @@ fn handle_request( let mut txs = vec![]; - if let Some(given_txid) = &after_txid { - let is_mempool = query - .mempool() - .history_txids_iter_group(&script_hashes) - .any(|txid| given_txid == &txid); - let is_confirmed = if is_mempool { - false - } else { - query - .chain() - .history_txids_iter_group(&script_hashes) - .any(|txid| given_txid == &txid) - }; - if !is_mempool && !is_confirmed { + let after_txid_location = if let Some(txid) = &after_txid { + find_txid(txid, &query.mempool(), query.chain()) + } else { + TxidLocation::Mempool + }; + + let mut confirmed_block_height = None; + match after_txid_location { + TxidLocation::Mempool => { + txs.extend( + query + .mempool() + .history_group(&script_hashes, after_txid.as_ref(), max_txs) + .into_iter() + .map(|tx| (tx, None)), + ); + } + TxidLocation::None => { return Err(HttpError( StatusCode::UNPROCESSABLE_ENTITY, String::from("after_txid not found"), )); } + TxidLocation::Chain(height) => { + confirmed_block_height = Some(height); + } } - txs.extend( - query - .mempool() - .history_group(&script_hashes, after_txid.as_ref(), max_txs) - .into_iter() - .map(|tx| (tx, None)), - ); if txs.len() < max_txs { let after_txid_ref = if !txs.is_empty() { @@ -1019,7 +1040,12 @@ fn handle_request( txs.extend( query .chain() - .history_group(&script_hashes, after_txid_ref, max_txs - txs.len()) + .history_group( + &script_hashes, + after_txid_ref, + confirmed_block_height, + max_txs - txs.len(), + ) .into_iter() .map(|(tx, blockid)| (tx, Some(blockid))), ); @@ -1147,10 +1173,32 @@ fn handle_request( .unwrap_or(config.rest_default_max_address_summary_txs), ); - let summary = - query - .chain() - .summary_group(&script_hashes, last_seen_txid.as_ref(), max_txs); + let last_seen_txid_location = if let Some(txid) = &last_seen_txid { + find_txid(txid, &query.mempool(), query.chain()) + } else { + TxidLocation::Mempool + }; + + let mut confirmed_block_height = None; + match last_seen_txid_location { + TxidLocation::Mempool => {} + TxidLocation::None => { + return Err(HttpError( + StatusCode::UNPROCESSABLE_ENTITY, + String::from("after_txid not found"), + )); + } + TxidLocation::Chain(height) => { + confirmed_block_height = Some(height); + } + } + + let summary = query.chain().summary_group( + &script_hashes, + last_seen_txid.as_ref(), + confirmed_block_height, + max_txs, + ); json_response(summary, TTL_SHORT) } From 2fbb0172f7cf69dc3637e75268fbe6c823089060 Mon Sep 17 00:00:00 2001 From: junderw Date: Tue, 8 Oct 2024 18:56:32 +0900 Subject: [PATCH 6/9] Remove more collect() calls for parallel iterator --- src/new_index/schema.rs | 147 +++++++++++++++++++--------------------- src/rest.rs | 23 +++---- 2 files changed, 81 insertions(+), 89 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 10ac88c4..98f9d8d9 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -668,12 +668,12 @@ impl ChainQuery { self.collate_summaries(rows, last_seen_txid, limit) } - pub fn history( - &self, + pub fn history<'a>( + &'a self, scripthash: &[u8], - last_seen_txid: Option<&Txid>, + last_seen_txid: Option<&'a Txid>, limit: usize, - ) -> Vec<(Transaction, BlockId)> { + ) -> impl rayon::iter::ParallelIterator> + 'a { // scripthash lookup self._history(b'H', scripthash, last_seen_txid, limit) } @@ -684,38 +684,32 @@ impl ChainQuery { .unique() } - fn _history( - &self, + fn _history<'a>( + &'a self, code: u8, hash: &[u8], - last_seen_txid: Option<&Txid>, + last_seen_txid: Option<&'a Txid>, limit: usize, - ) -> Vec<(Transaction, BlockId)> { + ) -> impl rayon::iter::ParallelIterator> + 'a { let _timer_scan = self.start_timer("history"); - let txs_conf = self - .history_iter_scan_reverse(code, hash) - .map(|row| TxHistoryRow::from_row(row).get_txid()) - // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? - .unique() - // TODO seek directly to last seen tx without reading earlier rows - .skip_while(|txid| { - // skip until we reach the last_seen_txid - last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) - }) - .skip(match last_seen_txid { - Some(_) => 1, // skip the last_seen_txid itself - None => 0, - }) - .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) - .take(limit) - .collect::>(); - self.lookup_txns(&txs_conf) - .expect("failed looking up txs in history index") - .into_iter() - .zip(txs_conf) - .map(|(tx, (_, blockid))| (tx, blockid)) - .collect() + self.lookup_txns( + self.history_iter_scan_reverse(code, hash) + .map(|row| TxHistoryRow::from_row(row).get_txid()) + // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? + .unique() + // TODO seek directly to last seen tx without reading earlier rows + .skip_while(move |txid| { + // skip until we reach the last_seen_txid + last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) + }) + .skip(match last_seen_txid { + Some(_) => 1, // skip the last_seen_txid itself + None => 0, + }) + .filter_map(move |txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) + .take(limit), + ) } pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<(Txid, BlockId)> { @@ -733,13 +727,13 @@ impl ChainQuery { .collect() } - pub fn history_group( - &self, + pub fn history_group<'a>( + &'a self, scripthashes: &[[u8; 32]], - last_seen_txid: Option<&Txid>, + last_seen_txid: Option<&'a Txid>, start_height: Option, limit: usize, - ) -> Vec<(Transaction, BlockId)> { + ) -> impl rayon::iter::ParallelIterator> + 'a { // scripthash lookup self._history_group(b'H', scripthashes, last_seen_txid, start_height, limit) } @@ -754,40 +748,34 @@ impl ChainQuery { .unique() } - fn _history_group( - &self, + fn _history_group<'a>( + &'a self, code: u8, hashes: &[[u8; 32]], - last_seen_txid: Option<&Txid>, + last_seen_txid: Option<&'a Txid>, start_height: Option, limit: usize, - ) -> Vec<(Transaction, BlockId)> { - print!("limit {} | last_seen {:?}", limit, last_seen_txid); + ) -> impl rayon::iter::ParallelIterator> + 'a { + debug!("limit {} | last_seen {:?}", limit, last_seen_txid); let _timer_scan = self.start_timer("history_group"); - let txs_conf = self - .history_iter_scan_group_reverse(code, hashes, start_height) - .map(|row| TxHistoryRow::from_row(row).get_txid()) - // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? - .unique() - .skip_while(|txid| { - // we already seeked to the last txid at this height - // now skip just past the last_seen_txid itself - last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) - }) - .skip(match last_seen_txid { - Some(_) => 1, // skip the last_seen_txid itself - None => 0, - }) - .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) - .take(limit) - .collect::>(); - self.lookup_txns(&txs_conf) - .expect("failed looking up txs in history index") - .into_iter() - .zip(txs_conf) - .map(|(tx, (_, blockid))| (tx, blockid)) - .collect() + self.lookup_txns( + self.history_iter_scan_group_reverse(code, hashes, start_height) + .map(|row| TxHistoryRow::from_row(row).get_txid()) + // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? + .unique() + .skip_while(move |txid| { + // we already seeked to the last txid at this height + // now skip just past the last_seen_txid itself + last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) + }) + .skip(match last_seen_txid { + Some(_) => 1, // skip the last_seen_txid itself + None => 0, + }) + .filter_map(move |txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) + .take(limit), + ) } // TODO: avoid duplication with stats/stats_delta? @@ -1086,15 +1074,20 @@ impl ChainQuery { // TODO: can we pass txids as a "generic iterable"? // TODO: should also use a custom ThreadPoolBuilder? - pub fn lookup_txns(&self, txids: &[(Txid, BlockId)]) -> Result> { - let _timer = self.start_timer("lookup_txns"); - txids - .par_iter() - .map(|(txid, blockid)| { - self.lookup_txn(txid, Some(&blockid.hash)) - .chain_err(|| "missing tx") - }) - .collect::>>() + pub fn lookup_txns<'a, I>( + &'a self, + txids: I, + ) -> impl rayon::iter::ParallelIterator> + 'a + where + I: Iterator + Send + rayon::iter::ParallelBridge + 'a, + { + txids.par_bridge().map(move |(txid, blockid)| -> Result<_> { + Ok(( + self.lookup_txn(&txid, Some(&blockid.hash)) + .chain_err(|| "missing tx")?, + blockid, + )) + }) } pub fn lookup_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { @@ -1205,12 +1198,12 @@ impl ChainQuery { } #[cfg(feature = "liquid")] - pub fn asset_history( - &self, - asset_id: &AssetId, - last_seen_txid: Option<&Txid>, + pub fn asset_history<'a>( + &'a self, + asset_id: &'a AssetId, + last_seen_txid: Option<&'a Txid>, limit: usize, - ) -> Vec<(Transaction, BlockId)> { + ) -> impl rayon::iter::ParallelIterator> + 'a { self._history(b'I', &asset_id.into_inner()[..], last_seen_txid, limit) } diff --git a/src/rest.rs b/src/rest.rs index 25e8861d..d144a77d 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -19,6 +19,7 @@ use hex::{self, FromHexError}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Response, Server, StatusCode}; use prometheus::{HistogramOpts, HistogramVec}; +use rayon::iter::ParallelIterator; use tokio::sync::oneshot; use hyperlocal::UnixServerExt; @@ -951,8 +952,8 @@ fn handle_request( query .chain() .history(&script_hash[..], after_txid_ref, max_txs - txs.len()) - .into_iter() - .map(|(tx, blockid)| (tx, Some(blockid))), + .map(|res| res.map(|(tx, blockid)| (tx, Some(blockid)))) + .collect::, _>>()?, ); } @@ -1046,8 +1047,8 @@ fn handle_request( confirmed_block_height, max_txs - txs.len(), ) - .into_iter() - .map(|(tx, blockid)| (tx, Some(blockid))), + .map(|res| res.map(|(tx, blockid)| (tx, Some(blockid)))) + .collect::, _>>()?, ); } @@ -1080,9 +1081,8 @@ fn handle_request( let txs = query .chain() .history(&script_hash[..], last_seen_txid.as_ref(), max_txs) - .into_iter() - .map(|(tx, blockid)| (tx, Some(blockid))) - .collect(); + .map(|res| res.map(|(tx, blockid)| (tx, Some(blockid)))) + .collect::, _>>()?; json_response(prepare_txs(txs, query, config), TTL_SHORT) } @@ -1697,8 +1697,8 @@ fn handle_request( query .chain() .asset_history(&asset_id, None, config.rest_default_chain_txs_per_page) - .into_iter() - .map(|(tx, blockid)| (tx, Some(blockid))), + .map(|res| res.map(|(tx, blockid)| (tx, Some(blockid)))) + .collect::, _>>()?, ); json_response(prepare_txs(txs, query, config), TTL_SHORT) @@ -1723,9 +1723,8 @@ fn handle_request( last_seen_txid.as_ref(), config.rest_default_chain_txs_per_page, ) - .into_iter() - .map(|(tx, blockid)| (tx, Some(blockid))) - .collect(); + .map(|res| res.map(|(tx, blockid)| (tx, Some(blockid)))) + .collect::, _>>()?; json_response(prepare_txs(txs, query, config), TTL_SHORT) } From 0661cac13792bc4416289f7ee5d0ac9b837df3dc Mon Sep 17 00:00:00 2001 From: junderw Date: Tue, 8 Oct 2024 19:02:53 +0900 Subject: [PATCH 7/9] Move the take into the function to discourage unbounded parallel transaction getting --- src/new_index/schema.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 98f9d8d9..233aeb57 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -707,8 +707,8 @@ impl ChainQuery { Some(_) => 1, // skip the last_seen_txid itself None => 0, }) - .filter_map(move |txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) - .take(limit), + .filter_map(move |txid| self.tx_confirming_block(&txid).map(|b| (txid, b))), + limit, ) } @@ -773,8 +773,8 @@ impl ChainQuery { Some(_) => 1, // skip the last_seen_txid itself None => 0, }) - .filter_map(move |txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) - .take(limit), + .filter_map(move |txid| self.tx_confirming_block(&txid).map(|b| (txid, b))), + limit, ) } @@ -1077,17 +1077,21 @@ impl ChainQuery { pub fn lookup_txns<'a, I>( &'a self, txids: I, + take: usize, ) -> impl rayon::iter::ParallelIterator> + 'a where I: Iterator + Send + rayon::iter::ParallelBridge + 'a, { - txids.par_bridge().map(move |(txid, blockid)| -> Result<_> { - Ok(( - self.lookup_txn(&txid, Some(&blockid.hash)) - .chain_err(|| "missing tx")?, - blockid, - )) - }) + txids + .take(take) + .par_bridge() + .map(move |(txid, blockid)| -> Result<_> { + Ok(( + self.lookup_txn(&txid, Some(&blockid.hash)) + .chain_err(|| "missing tx")?, + blockid, + )) + }) } pub fn lookup_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { From 0e39040b3d0e975ca4fbe01522a08ee55cf27c9f Mon Sep 17 00:00:00 2001 From: Mononaut Date: Tue, 8 Oct 2024 16:39:39 +0000 Subject: [PATCH 8/9] fix wrong method type on /addresses/txs/summary --- src/rest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rest.rs b/src/rest.rs index d144a77d..7286c502 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -1119,7 +1119,7 @@ fn handle_request( json_response(summary, TTL_SHORT) } ( - &Method::GET, + &Method::POST, Some(script_types @ &"addresses"), Some(&"txs"), Some(&"summary"), From b7d9b3a9dbcfbaa63e7fd30ede43175553c5c12d Mon Sep 17 00:00:00 2001 From: junderw Date: Wed, 9 Oct 2024 19:02:44 +0900 Subject: [PATCH 9/9] Add performance improvements to single address endpoints --- src/new_index/schema.rs | 33 +++++++++++---- src/rest.rs | 91 ++++++++++++++++++++++++++--------------- 2 files changed, 83 insertions(+), 41 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 233aeb57..b2323cee 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -507,10 +507,17 @@ impl ChainQuery { &TxHistoryRow::prefix_height(code, hash, start_height as u32), ) } - fn history_iter_scan_reverse(&self, code: u8, hash: &[u8]) -> ReverseScanIterator { + fn history_iter_scan_reverse( + &self, + code: u8, + hash: &[u8], + start_height: Option, + ) -> ReverseScanIterator { self.store.history_db.iter_scan_reverse( &TxHistoryRow::filter(code, hash), - &TxHistoryRow::prefix_end(code, hash), + &start_height.map_or(TxHistoryRow::prefix_end(code, hash), |start_height| { + TxHistoryRow::prefix_height_end(code, hash, start_height) + }), ) } fn history_iter_scan_group_reverse( @@ -631,10 +638,11 @@ impl ChainQuery { &self, scripthash: &[u8], last_seen_txid: Option<&Txid>, + start_height: Option, limit: usize, ) -> Vec { // scripthash lookup - self._summary(b'H', scripthash, last_seen_txid, limit) + self._summary(b'H', scripthash, last_seen_txid, start_height, limit) } fn _summary( @@ -642,11 +650,12 @@ impl ChainQuery { code: u8, hash: &[u8], last_seen_txid: Option<&Txid>, + start_height: Option, limit: usize, ) -> Vec { let _timer_scan = self.start_timer("address_summary"); let rows = self - .history_iter_scan_reverse(code, hash) + .history_iter_scan_reverse(code, hash, start_height) .map(TxHistoryRow::from_row); self.collate_summaries(rows, last_seen_txid, limit) @@ -672,14 +681,15 @@ impl ChainQuery { &'a self, scripthash: &[u8], last_seen_txid: Option<&'a Txid>, + start_height: Option, limit: usize, ) -> impl rayon::iter::ParallelIterator> + 'a { // scripthash lookup - self._history(b'H', scripthash, last_seen_txid, limit) + self._history(b'H', scripthash, last_seen_txid, start_height, limit) } pub fn history_txids_iter<'a>(&'a self, scripthash: &[u8]) -> impl Iterator + 'a { - self.history_iter_scan_reverse(b'H', scripthash) + self.history_iter_scan_reverse(b'H', scripthash, None) .map(|row| TxHistoryRow::from_row(row).get_txid()) .unique() } @@ -689,12 +699,13 @@ impl ChainQuery { code: u8, hash: &[u8], last_seen_txid: Option<&'a Txid>, + start_height: Option, limit: usize, ) -> impl rayon::iter::ParallelIterator> + 'a { let _timer_scan = self.start_timer("history"); self.lookup_txns( - self.history_iter_scan_reverse(code, hash) + self.history_iter_scan_reverse(code, hash, start_height) .map(|row| TxHistoryRow::from_row(row).get_txid()) // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? .unique() @@ -1208,7 +1219,13 @@ impl ChainQuery { last_seen_txid: Option<&'a Txid>, limit: usize, ) -> impl rayon::iter::ParallelIterator> + 'a { - self._history(b'I', &asset_id.into_inner()[..], last_seen_txid, limit) + self._history( + b'I', + &asset_id.into_inner()[..], + last_seen_txid, + None, + limit, + ) } #[cfg(feature = "liquid")] diff --git a/src/rest.rs b/src/rest.rs index 7286c502..e503191e 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -911,26 +911,31 @@ fn handle_request( let mut txs = vec![]; - if let Some(given_txid) = &after_txid { - let is_mempool = query - .mempool() - .history_txids_iter(&script_hash[..]) - .any(|txid| given_txid == &txid); - let is_confirmed = if is_mempool { - false - } else { - query - .chain() - .history_txids_iter(&script_hash[..]) - .any(|txid| given_txid == &txid) - }; - if !is_mempool && !is_confirmed { + let after_txid_location = if let Some(txid) = &after_txid { + find_txid(txid, &query.mempool(), query.chain()) + } else { + TxidLocation::Mempool + }; + + let confirmed_block_height = match after_txid_location { + TxidLocation::Mempool => { + txs.extend( + query + .mempool() + .history(&script_hash[..], after_txid.as_ref(), max_txs) + .into_iter() + .map(|tx| (tx, None)), + ); + None + } + TxidLocation::None => { return Err(HttpError( StatusCode::UNPROCESSABLE_ENTITY, String::from("after_txid not found"), )); } - } + TxidLocation::Chain(height) => Some(height), + }; txs.extend( query @@ -951,7 +956,12 @@ fn handle_request( txs.extend( query .chain() - .history(&script_hash[..], after_txid_ref, max_txs - txs.len()) + .history( + &script_hash[..], + after_txid_ref, + confirmed_block_height, + max_txs - txs.len(), + ) .map(|res| res.map(|(tx, blockid)| (tx, Some(blockid)))) .collect::, _>>()?, ); @@ -1008,8 +1018,7 @@ fn handle_request( TxidLocation::Mempool }; - let mut confirmed_block_height = None; - match after_txid_location { + let confirmed_block_height = match after_txid_location { TxidLocation::Mempool => { txs.extend( query @@ -1018,6 +1027,7 @@ fn handle_request( .into_iter() .map(|tx| (tx, None)), ); + None } TxidLocation::None => { return Err(HttpError( @@ -1025,10 +1035,8 @@ fn handle_request( String::from("after_txid not found"), )); } - TxidLocation::Chain(height) => { - confirmed_block_height = Some(height); - } - } + TxidLocation::Chain(height) => Some(height), + }; if txs.len() < max_txs { let after_txid_ref = if !txs.is_empty() { @@ -1080,7 +1088,7 @@ fn handle_request( let txs = query .chain() - .history(&script_hash[..], last_seen_txid.as_ref(), max_txs) + .history(&script_hash[..], last_seen_txid.as_ref(), None, max_txs) .map(|res| res.map(|(tx, blockid)| (tx, Some(blockid)))) .collect::, _>>()?; @@ -1112,9 +1120,29 @@ fn handle_request( .unwrap_or(config.rest_default_max_address_summary_txs), ); - let summary = query - .chain() - .summary(&script_hash[..], last_seen_txid.as_ref(), max_txs); + let last_seen_txid_location = if let Some(txid) = &last_seen_txid { + find_txid(txid, &query.mempool(), query.chain()) + } else { + TxidLocation::Mempool + }; + + let confirmed_block_height = match last_seen_txid_location { + TxidLocation::Mempool => None, + TxidLocation::None => { + return Err(HttpError( + StatusCode::UNPROCESSABLE_ENTITY, + String::from("after_txid not found"), + )); + } + TxidLocation::Chain(height) => Some(height), + }; + + let summary = query.chain().summary( + &script_hash[..], + last_seen_txid.as_ref(), + confirmed_block_height, + max_txs, + ); json_response(summary, TTL_SHORT) } @@ -1179,19 +1207,16 @@ fn handle_request( TxidLocation::Mempool }; - let mut confirmed_block_height = None; - match last_seen_txid_location { - TxidLocation::Mempool => {} + let confirmed_block_height = match last_seen_txid_location { + TxidLocation::Mempool => None, TxidLocation::None => { return Err(HttpError( StatusCode::UNPROCESSABLE_ENTITY, String::from("after_txid not found"), )); } - TxidLocation::Chain(height) => { - confirmed_block_height = Some(height); - } - } + TxidLocation::Chain(height) => Some(height), + }; let summary = query.chain().summary_group( &script_hashes,