Skip to content

Commit

Permalink
Merge branch 'mempool' into mononaut/batch-outspends
Browse files Browse the repository at this point in the history
  • Loading branch information
softsimon authored Sep 16, 2023
2 parents 8903193 + 78d9ede commit 5f1577b
Show file tree
Hide file tree
Showing 12 changed files with 11,957 additions and 11,492 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ electrum-discovery = [ "electrum-client"]
[dependencies]
arrayref = "0.3.6"
base64 = "0.13.0"
bincode = "1.3.1"
bincode-do-not-use-directly = { version = "1.3.1", package = "bincode" }
bitcoin = { version = "0.28", features = [ "use-serde" ] }
bounded-vec-deque = "0.1.1"
clap = "2.33.3"
Expand Down
22,730 changes: 11,365 additions & 11,365 deletions contrib/popular-scripts.txt

Large diffs are not rendered by default.

181 changes: 155 additions & 26 deletions src/bin/popular-scripts.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,181 @@
extern crate electrs;

use bincode::Options;
use electrs::{
config::Config,
new_index::{Store, TxHistoryKey},
};
use std::{convert::TryInto, thread::ThreadId, time::Instant};

use electrs::{config::Config, new_index::db::open_raw_db};
use lazy_static::lazy_static;

/*
// How to run:
export ELECTRS_DATA=/path/to/electrs
cargo run \
-q --release --bin popular-scripts -- \
--db-dir $ELECTRS_DATA/db \
> ./contrib/popular-scripts.txt
*/

type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
lazy_static! {
static ref HISTORY_DB: DB = {
let config = Config::from_args();
open_raw_db(&config.db_path.join("newindex").join("history"))
};
}

// Dev note:
// Only use println for file output (lines for output)
// Use eprintln to print to stderr for dev notifications
fn main() {
let config = Config::from_args();
let store = Store::open(&config.db_path.join("newindex"), &config);
let high_usage_threshold = std::env::var("HIGH_USAGE_THRESHOLD")
.ok()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(4000);
let thread_count = std::env::var("JOB_THREAD_COUNT")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(4);
eprintln!(
"Seaching for scripts with history rows of {} or more...",
high_usage_threshold
);

let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(thread_count)
.build()
.expect("Built threadpool");

let (sender, receiver) = crossbeam_channel::unbounded::<[u8; 32]>();

let increment = 256 / thread_count;
let bytes: Vec<u8> = (0u8..=255u8)
.filter(|n| *n % increment as u8 == 0)
.collect();

let now = Instant::now();
for i in 0..bytes.len() {
let sender = sender.clone();
let first_byte = bytes[i];
let second_byte = bytes.get(i + 1).copied();

thread_pool.spawn(move || {
let id = std::thread::current().id();
run_iterator(
id,
&HISTORY_DB,
high_usage_threshold,
first_byte,
second_byte,
sender,
now,
);
eprintln!("{id:?} Finished its job!");
})
}
// If we don't drop this sender
// the receiver will hang forever
drop(sender);

while let Ok(script) = receiver.recv() {
println!("{}", hex::encode(script));
}
eprintln!("Finished!!!!");
}

fn run_iterator(
thread_id: ThreadId,
db: &DB,
high_usage_threshold: u32,
first_byte: u8,
next_byte: Option<u8>,
sender: crossbeam_channel::Sender<[u8; 32]>,
now: Instant,
) {
let mut iter = db.raw_iterator();
eprintln!(
"Thread ({thread_id:?}) Seeking DB to beginning of tx histories for b'H' + {}",
hex::encode([first_byte])
);
// H = 72
let mut compare_vec: Vec<u8> = vec![72, first_byte];
iter.seek(&compare_vec); // Seek to beginning of our section

let mut iter = store.history_db().raw_iterator();
iter.seek(b"H");
// Insert the byte of the next section for comparing
// This will tell us when to stop with a closure
type Checker<'a> = Box<dyn Fn(&[u8]) -> bool + 'a>;
let is_finished: Checker<'_> = if let Some(next) = next_byte {
// Modify the vec to what we're looking for next
// to indicate we left our section
compare_vec[1] = next;
Box::new(|key: &[u8]| -> bool { key.starts_with(&compare_vec) })
} else {
// Modify the vec to only have H so we know when we left H
compare_vec.remove(1);
Box::new(|key: &[u8]| -> bool { !key.starts_with(&compare_vec) })
};

eprintln!("Thread ({thread_id:?}) Seeking done");

let mut curr_scripthash = [0u8; 32];
let mut total_entries = 0;
let mut total_entries: usize = 0;
let mut iter_index: usize = 1;

while iter.valid() {
let key = iter.key().unwrap();

if !key.starts_with(b"H") {
if is_finished(key) {
// We have left the txhistory section,
// but we need to check the final scripthash
send_if_popular(
high_usage_threshold,
total_entries,
curr_scripthash,
&sender,
);
break;
}

let entry: TxHistoryKey = bincode::options()
.with_big_endian()
.deserialize(key)
.expect("failed to deserialize TxHistoryKey");
if iter_index % 10_000_000 == 0 {
let duration = now.elapsed().as_secs();
eprintln!(
"Thread ({thread_id:?}) Processing row #{iter_index}... {duration} seconds elapsed"
);
}

// We know that the TxHistory key is 1 byte "H" followed by
// 32 byte scripthash
let entry_hash: [u8; 32] = key[1..33].try_into().unwrap();

if curr_scripthash != entry.hash {
if total_entries > 100 {
println!("{} {}", hex::encode(curr_scripthash), total_entries);
}
if curr_scripthash != entry_hash {
// We have rolled on to a new scripthash
// If the last scripthash was popular
// Collect for sorting
send_if_popular(
high_usage_threshold,
total_entries,
curr_scripthash,
&sender,
);

curr_scripthash = entry.hash;
// After collecting, reset values for next scripthash
curr_scripthash = entry_hash;
total_entries = 0;
}

total_entries += 1;
iter_index += 1;

iter.next();
}
}

if total_entries >= 4000 {
println!(
"scripthash,{},{}",
hex::encode(curr_scripthash),
total_entries
);
#[inline]
fn send_if_popular(
high_usage_threshold: u32,
total_entries: usize,
curr_scripthash: [u8; 32],
sender: &crossbeam_channel::Sender<[u8; 32]>,
) {
if total_entries >= high_usage_threshold as usize {
sender.send(curr_scripthash).unwrap();
}
}
21 changes: 18 additions & 3 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ pub struct BlockchainInfo {
pub initialblockdownload: Option<bool>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct MempoolInfo {
pub loaded: bool,
}

#[derive(Serialize, Deserialize, Debug)]
struct NetworkInfo {
version: u64,
Expand Down Expand Up @@ -313,16 +318,21 @@ impl Daemon {
}
loop {
let info = daemon.getblockchaininfo()?;
let mempool = daemon.getmempoolinfo()?;

if !info.initialblockdownload.unwrap_or(false) && info.blocks == info.headers {
if mempool.loaded
&& !info.initialblockdownload.unwrap_or(false)
&& info.blocks == info.headers
{
break;
}

warn!(
"waiting for bitcoind sync to finish: {}/{} blocks, verification progress: {:.3}%",
"waiting for bitcoind sync and mempool load to finish: {}/{} blocks, verification progress: {:.3}%, mempool loaded: {}",
info.blocks,
info.headers,
info.verificationprogress * 100.0
info.verificationprogress * 100.0,
mempool.loaded
);
signal.wait(Duration::from_secs(5), false)?;
}
Expand Down Expand Up @@ -428,6 +438,11 @@ impl Daemon {
from_value(info).chain_err(|| "invalid blockchain info")
}

fn getmempoolinfo(&self) -> Result<MempoolInfo> {
let info: Value = self.request("getmempoolinfo", json!([]))?;
from_value(info).chain_err(|| "invalid mempool info")
}

fn getnetworkinfo(&self) -> Result<NetworkInfo> {
let info: Value = self.request("getnetworkinfo", json!([]))?;
from_value(info).chain_err(|| "invalid network info")
Expand Down
14 changes: 9 additions & 5 deletions src/elements/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::elements::registry::{AssetMeta, AssetRegistry};
use crate::errors::*;
use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow};
use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query};
use crate::util::{full_hash, Bytes, FullHash, TransactionStatus, TxInput};
use crate::util::{bincode_util, full_hash, Bytes, FullHash, TransactionStatus, TxInput};

lazy_static! {
pub static ref NATIVE_ASSET_ID: AssetId =
Expand Down Expand Up @@ -152,6 +152,7 @@ impl LiquidAsset {
}

#[derive(Serialize, Deserialize, Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct IssuingInfo {
pub txid: FullHash,
pub vin: u16,
Expand All @@ -162,6 +163,7 @@ pub struct IssuingInfo {
}

#[derive(Serialize, Deserialize, Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct BurningInfo {
pub txid: FullHash,
pub vout: u16,
Expand Down Expand Up @@ -189,7 +191,7 @@ pub fn index_confirmed_tx_assets(
// reissuances are only kept under the history index.
rows.extend(issuances.into_iter().map(|(asset_id, asset_row)| DBRow {
key: [b"i", &asset_id.into_inner()[..]].concat(),
value: bincode::serialize(&asset_row).unwrap(),
value: bincode_util::serialize_little(&asset_row).unwrap(),
}));
}

Expand Down Expand Up @@ -372,7 +374,9 @@ pub fn lookup_asset(

let chain_row = history_db
.get(&[b"i", &asset_id.into_inner()[..]].concat())
.map(|row| bincode::deserialize::<AssetRow>(&row).expect("failed parsing AssetRow"));
.map(|row| {
bincode_util::deserialize_little::<AssetRow>(&row).expect("failed parsing AssetRow")
});

let row = chain_row
.as_ref()
Expand Down Expand Up @@ -454,7 +458,7 @@ where
{
DBRow {
key: asset_cache_key(asset_id),
value: bincode::serialize(&(stats, blockhash)).unwrap(),
value: bincode_util::serialize_little(&(stats, blockhash)).unwrap(),
}
}

Expand Down Expand Up @@ -497,7 +501,7 @@ where
.store()
.cache_db()
.get(&asset_cache_key(asset_id))
.map(|c| bincode::deserialize(&c).unwrap())
.map(|c| bincode_util::deserialize_little(&c).unwrap())
.and_then(|(stats, blockhash)| {
chain
.height_by_hash(&blockhash)
Expand Down
2 changes: 2 additions & 0 deletions src/elements/peg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl PegoutValue {

// Inner type for the indexer TxHistoryInfo::Pegin variant
#[derive(Serialize, Deserialize, Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct PeginInfo {
pub txid: FullHash,
pub vin: u16,
Expand All @@ -60,6 +61,7 @@ pub struct PeginInfo {

// Inner type for the indexer TxHistoryInfo::Pegout variant
#[derive(Serialize, Deserialize, Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct PegoutInfo {
pub txid: FullHash,
pub vout: u16,
Expand Down
Loading

0 comments on commit 5f1577b

Please sign in to comment.