Skip to content

Commit

Permalink
Merge branch 'mempool' into junderw/json-rpc-2
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw authored Nov 10, 2024
2 parents 277b05b + cb79811 commit 66c76bc
Show file tree
Hide file tree
Showing 7 changed files with 683 additions and 100 deletions.
47 changes: 47 additions & 0 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,34 @@ pub struct MempoolAcceptResult {
reject_reason: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
struct MempoolFeesSubmitPackage {
base: f64,
#[serde(rename = "effective-feerate")]
effective_feerate: Option<f64>,
#[serde(rename = "effective-includes")]
effective_includes: Option<Vec<String>>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SubmitPackageResult {
package_msg: String,
#[serde(rename = "tx-results")]
tx_results: HashMap<String, TxResult>,
#[serde(rename = "replaced-transactions")]
replaced_transactions: Option<Vec<String>>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct TxResult {
txid: String,
#[serde(rename = "other-wtxid")]
other_wtxid: Option<String>,
vsize: Option<u32>,
fees: Option<MempoolFeesSubmitPackage>,
error: Option<String>,
}

pub trait CookieGetter: Send + Sync {
fn get(&self) -> Result<Vec<u8>>;
}
Expand Down Expand Up @@ -621,6 +649,25 @@ impl Daemon {
.chain_err(|| "invalid testmempoolaccept reply")
}

pub fn submit_package(
&self,
txhex: Vec<String>,
maxfeerate: Option<f64>,
maxburnamount: Option<f64>,
) -> Result<SubmitPackageResult> {
let params = match (maxfeerate, maxburnamount) {
(Some(rate), Some(burn)) => {
json!([txhex, format!("{:.8}", rate), format!("{:.8}", burn)])
}
(Some(rate), None) => json!([txhex, format!("{:.8}", rate)]),
(None, Some(burn)) => json!([txhex, null, format!("{:.8}", burn)]),
(None, None) => json!([txhex]),
};
let result = self.request("submitpackage", params)?;
serde_json::from_value::<SubmitPackageResult>(result)
.chain_err(|| "invalid submitpackage reply")
}

// Get estimated feerates for the provided confirmation targets using a batch RPC request
// Missing estimates are logged but do not cause a failure, whatever is available is returned
#[allow(clippy::float_cmp)]
Expand Down
80 changes: 80 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,67 @@ impl<'a> Iterator for ReverseScanIterator<'a> {
}
}

pub struct ReverseScanGroupIterator<'a> {
iters: Vec<ReverseScanIterator<'a>>,
next_rows: Vec<Option<DBRow>>,
value_offset: usize,
done: bool,
}

impl<'a> ReverseScanGroupIterator<'a> {
pub fn new(
mut iters: Vec<ReverseScanIterator<'a>>,
value_offset: usize,
) -> ReverseScanGroupIterator {
let mut next_rows: Vec<Option<DBRow>> = Vec::with_capacity(iters.len());
for iter in &mut iters {
let next = iter.next();
next_rows.push(next);
}
let done = next_rows.iter().all(|row| row.is_none());
ReverseScanGroupIterator {
iters,
next_rows,
value_offset,
done,
}
}
}

impl<'a> Iterator for ReverseScanGroupIterator<'a> {
type Item = DBRow;

fn next(&mut self) -> Option<DBRow> {
if self.done {
return None;
}

let best_index = self
.next_rows
.iter()
.enumerate()
.max_by(|(a_index, a_opt), (b_index, b_opt)| match (a_opt, b_opt) {
(None, None) => a_index.cmp(b_index),

(Some(_), None) => std::cmp::Ordering::Greater,

(None, Some(_)) => std::cmp::Ordering::Less,

(Some(a), Some(b)) => a.key[self.value_offset..].cmp(&(b.key[self.value_offset..])),
})
.map(|(index, _)| index)
.unwrap_or(0);

let best = self.next_rows[best_index].take();
self.next_rows[best_index] = self.iters.get_mut(best_index)?.next();
if self.next_rows.iter().all(|row| row.is_none()) {
self.done = true;
}

best
}
}

#[derive(Debug)]
pub struct DB {
db: rocksdb::DB,
Expand Down Expand Up @@ -140,6 +201,25 @@ impl DB {
}
}

pub fn iter_scan_group_reverse(
&self,
prefixes: impl Iterator<Item = (Vec<u8>, Vec<u8>)>,
value_offset: usize,
) -> ReverseScanGroupIterator {
let iters = prefixes
.map(|(prefix, prefix_max)| {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(prefix_max);
ReverseScanIterator {
prefix: prefix.to_vec(),
iter,
done: false,
}
})
.collect();
ReverseScanGroupIterator::new(iters, value_offset)
}

pub fn write(&self, mut rows: Vec<DBRow>, flush: DBFlush) {
debug!(
"writing {} rows to {:?}, flush={:?}",
Expand Down
43 changes: 43 additions & 0 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,49 @@ impl Mempool {
.collect()
}

pub fn history_group(
&self,
scripthashes: &[[u8; 32]],
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<Transaction> {
let _timer = self
.latency
.with_label_values(&["history_group"])
.start_timer();
scripthashes
.iter()
.filter_map(|scripthash| self.history.get(&scripthash[..]))
.flat_map(|entries| entries.iter())
.map(|e| e.get_txid())
.unique()
// TODO seek directly to last seen tx without reading earlier rows
.skip_while(|txid| {
// skip until we reach the last_seen_txid
last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid)
})
.skip(match last_seen_txid {
Some(_) => 1, // skip the last_seen_txid itself
None => 0,
})
.take(limit)
.map(|txid| self.txstore.get(&txid).expect("missing mempool tx"))
.cloned()
.collect()
}

pub fn history_txids_iter_group<'a>(
&'a self,
scripthashes: &'a [[u8; 32]],
) -> impl Iterator<Item = Txid> + 'a {
scripthashes
.iter()
.filter_map(move |scripthash| self.history.get(&scripthash[..]))
.flat_map(|entries| entries.iter())
.map(|entry| entry.get_txid())
.unique()
}

pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<Txid> {
let _timer = self
.latency
Expand Down
11 changes: 10 additions & 1 deletion src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::{Duration, Instant};

use crate::chain::{Network, OutPoint, Transaction, TxOut, Txid};
use crate::config::Config;
use crate::daemon::{Daemon, MempoolAcceptResult};
use crate::daemon::{Daemon, MempoolAcceptResult, SubmitPackageResult};
use crate::errors::*;
use crate::new_index::{ChainQuery, Mempool, ScriptStats, SpendingInput, Utxo};
use crate::util::{is_spendable, BlockId, Bytes, TransactionStatus};
Expand Down Expand Up @@ -95,6 +95,15 @@ impl Query {
self.daemon.test_mempool_accept(txhex, maxfeerate)
}

pub fn submit_package(
&self,
txhex: Vec<String>,
maxfeerate: Option<f64>,
maxburnamount: Option<f64>,
) -> Result<SubmitPackageResult> {
self.daemon.submit_package(txhex, maxfeerate, maxburnamount)
}

pub fn utxo(&self, scripthash: &[u8]) -> Result<Vec<Utxo>> {
let mut utxos = self.chain.utxo(
scripthash,
Expand Down
Loading

0 comments on commit 66c76bc

Please sign in to comment.