Skip to content

Commit

Permalink
Merge branch 'mempool' into docker_build
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw authored Aug 3, 2023
2 parents 5dfe714 + 46cd53c commit bc795f9
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 71 deletions.
92 changes: 65 additions & 27 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ impl Mempool {
TxHistoryInfo::Funding(info) => {
// Liquid requires some additional information from the txo that's not available in the TxHistoryInfo index.
#[cfg(feature = "liquid")]
let txo = self
.lookup_txo(&entry.get_funded_outpoint())
.expect("missing txo");
let txo = self.lookup_txo(&entry.get_funded_outpoint())?;

Some(Utxo {
txid: deserialize(&info.txid).expect("invalid txid"),
Expand Down Expand Up @@ -345,7 +343,9 @@ impl Mempool {
}
};
// Add new transactions
self.add(to_add)?;
if to_add.len() > self.add(to_add) {
debug!("Mempool update added less transactions than expected");
}
// Remove missing transactions
self.remove(to_remove);

Expand All @@ -370,39 +370,66 @@ impl Mempool {
pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> {
if self.txstore.get(txid).is_none() {
if let Ok(tx) = daemon.getmempooltx(txid) {
self.add(vec![tx])?;
if self.add(vec![tx]) == 0 {
return Err(format!(
"Unable to add {txid} to mempool likely due to missing parents."
)
.into());
}
}
}

Ok(())
}

fn add(&mut self, txs: Vec<Transaction>) -> Result<()> {
/// Add transactions to the mempool.
///
/// The return value is the number of transactions processed.
#[must_use = "Must deal with [[input vec's length]] > [[result]]."]
fn add(&mut self, txs: Vec<Transaction>) -> usize {
self.delta
.with_label_values(&["add"])
.observe(txs.len() as f64);
let _timer = self.latency.with_label_values(&["add"]).start_timer();
let txlen = txs.len();
if txlen == 0 {
return 0;
}
debug!("Adding {} transactions to Mempool", txlen);

let mut txids = vec![];
let mut txids = Vec::with_capacity(txs.len());
// Phase 1: add to txstore
for tx in txs {
let txid = tx.txid();
txids.push(txid);
self.txstore.insert(txid, tx);
}
// Phase 2: index history and spend edges (can fail if some txos cannot be found)
let txos = match self.lookup_txos(&self.get_prevouts(&txids)) {
Ok(txos) => txos,
Err(err) => {
warn!("lookup txouts failed: {}", err);
// TODO: should we remove txids from txstore?
return Ok(());
}
};

// Phase 2: index history and spend edges (some txos can be missing)
let txos = self.lookup_txos(&self.get_prevouts(&txids));

// Count how many transactions were actually processed.
let mut processed_count = 0;

// Phase 3: Iterate over the transactions and do the following:
// 1. Find all of the TxOuts of each input parent using `txos`
// 2. If any parent wasn't found, skip parsing this transaction
// 3. Insert TxFeeInfo into info.
// 4. Push TxOverview into recent tx queue.
// 5. Create the Spend and Fund TxHistory structs for inputs + outputs
// 6. Insert all TxHistory into history.
// 7. Insert the tx edges into edges (HashMap of (Outpoint, (Txid, vin)))
// 8. (Liquid only) Parse assets of tx.
for txid in txids {
let tx = self.txstore.get(&txid).expect("missing mempool tx");
let tx = self.txstore.get(&txid).expect("missing tx from txstore");

let prevouts = match extract_tx_prevouts(tx, &txos, false) {
Ok(v) => v,
Err(e) => {
warn!("Skipping tx {txid} missing parent error: {e}");
continue;
}
};
let txid_bytes = full_hash(&txid[..]);
let prevouts = extract_tx_prevouts(tx, &txos, false)?;

// Get feeinfo for caching and recent tx overview
let feeinfo = TxFeeInfo::new(tx, &prevouts, self.config.network_type);
Expand Down Expand Up @@ -472,18 +499,26 @@ impl Mempool {
&mut self.asset_history,
&mut self.asset_issuance,
);

processed_count += 1;
}

Ok(())
processed_count
}

pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result<TxOut> {
/// Returns None if the lookup fails (mempool transaction RBF-ed etc.)
pub fn lookup_txo(&self, outpoint: &OutPoint) -> Option<TxOut> {
let mut outpoints = BTreeSet::new();
outpoints.insert(*outpoint);
Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap())
// This can possibly be None now
self.lookup_txos(&outpoints).remove(outpoint)
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
/// For a given set of OutPoints, return a HashMap<OutPoint, TxOut>
///
/// Not all OutPoints from mempool transactions are guaranteed to be there.
/// Ensure you deal with the None case in your logic.
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
let _timer = self
.latency
.with_label_values(&["lookup_txos"])
Expand All @@ -494,18 +529,21 @@ impl Mempool {
let mempool_txos = outpoints
.iter()
.filter(|outpoint| !confirmed_txos.contains_key(outpoint))
.map(|outpoint| {
.flat_map(|outpoint| {
self.txstore
.get(&outpoint.txid)
.and_then(|tx| tx.output.get(outpoint.vout as usize).cloned())
.map(|txout| (*outpoint, txout))
.chain_err(|| format!("missing outpoint {:?}", outpoint))
.or_else(|| {
warn!("missing outpoint {:?}", outpoint);
None
})
})
.collect::<Result<HashMap<OutPoint, TxOut>>>()?;
.collect::<HashMap<OutPoint, TxOut>>();

let mut txos = confirmed_txos;
txos.extend(mempool_txos);
Ok(txos)
txos
}

fn get_prevouts(&self, txids: &[Txid]) -> BTreeSet<OutPoint> {
Expand Down
15 changes: 12 additions & 3 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ impl Query {

pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
let txid = self.daemon.broadcast_raw(txhex)?;
self.mempool
// The important part is whether we succeeded in broadcasting.
// Ignore errors in adding to the cache and show an internal warning.
if let Err(e) = self
.mempool
.write()
.unwrap()
.add_by_txid(&self.daemon, &txid)?;
.add_by_txid(&self.daemon, &txid)
{
warn!(
"broadcast_raw of {txid} succeeded to broadcast \
but failed to add to mempool-electrs Mempool cache: {e}"
);
}
Ok(txid)
}

Expand Down Expand Up @@ -118,7 +127,7 @@ impl Query {
.or_else(|| self.mempool().lookup_raw_txn(txid))
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
// the mempool lookup_txos() internally looks up confirmed txos as well
self.mempool().lookup_txos(outpoints)
}
Expand Down
30 changes: 30 additions & 0 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ impl Indexer {
}

fn add(&self, blocks: &[BlockEntry]) {
debug!("Adding {} blocks to Indexer", blocks.len());
// TODO: skip orphaned blocks?
let rows = {
let _timer = self.start_timer("add_process");
Expand All @@ -310,6 +311,7 @@ impl Indexer {
}

fn index(&self, blocks: &[BlockEntry]) {
debug!("Indexing {} blocks with Indexer", blocks.len());
let previous_txos_map = {
let _timer = self.start_timer("index_lookup");
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
Expand Down Expand Up @@ -371,6 +373,34 @@ impl ChainQuery {
}
}

pub fn get_block_txs(&self, hash: &BlockHash) -> Option<Vec<Transaction>> {
let _timer = self.start_timer("get_block_txs");

let txids: Option<Vec<Txid>> = if self.light_mode {
// TODO fetch block as binary from REST API instead of as hex
let mut blockinfo = self.daemon.getblock_raw(hash, 1).ok()?;
Some(serde_json::from_value(blockinfo["tx"].take()).unwrap())
} else {
self.store
.txstore_db
.get(&BlockRow::txids_key(full_hash(&hash[..])))
.map(|val| bincode::deserialize(&val).expect("failed to parse block txids"))
};

txids.and_then(|txid_vec| {
let mut transactions = Vec::with_capacity(txid_vec.len());

for txid in txid_vec {
match self.lookup_txn(&txid, Some(hash)) {
Some(transaction) => transactions.push(transaction),
None => return None,
}
}

Some(transactions)
})
}

pub fn get_block_meta(&self, hash: &BlockHash) -> Option<BlockMeta> {
let _timer = self.start_timer("get_block_meta");

Expand Down
Loading

0 comments on commit bc795f9

Please sign in to comment.