Skip to content

Commit

Permalink
Fix popular scripts script
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw committed Sep 15, 2023
1 parent ef58ea3 commit c01a2c4
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 42 deletions.
179 changes: 155 additions & 24 deletions src/bin/popular-scripts.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,181 @@
extern crate electrs;

use electrs::{
config::Config,
new_index::{Store, TxHistoryKey},
util::bincode_util,
};
use std::{convert::TryInto, thread::ThreadId, time::Instant};

use electrs::{config::Config, new_index::db::open_raw_db};
use lazy_static::lazy_static;

/*
// How to run:
export ELECTRS_DATA=/path/to/electrs
cargo run \
-q --release --bin popular-scripts -- \
--db-dir $ELECTRS_DATA/db \
> ./contrib/popular-scripts.txt
*/

type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
lazy_static! {
static ref HISTORY_DB: DB = {
let config = Config::from_args();
open_raw_db(&config.db_path.join("newindex").join("history"))
};
}

// Dev note:
// Only use println for file output (lines for output)
// Use eprintln to print to stderr for dev notifications
fn main() {
let config = Config::from_args();
let store = Store::open(&config.db_path.join("newindex"), &config);
let high_usage_threshold = std::env::var("HIGH_USAGE_THRESHOLD")
.ok()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(4000);
let thread_count = std::env::var("JOB_THREAD_COUNT")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(4);
eprintln!(
"Seaching for scripts with history rows of {} or more...",
high_usage_threshold
);

let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(thread_count)
.build()
.expect("Built threadpool");

let (sender, receiver) = crossbeam_channel::unbounded::<[u8; 32]>();

let increment = 256 / thread_count;
let bytes: Vec<u8> = (0u8..=255u8)
.filter(|n| *n % increment as u8 == 0)
.collect();

let now = Instant::now();
for i in 0..bytes.len() {
let sender = sender.clone();
let first_byte = bytes[i];
let second_byte = bytes.get(i + 1).copied();

thread_pool.spawn(move || {
let id = std::thread::current().id();
run_iterator(
id,
&HISTORY_DB,
high_usage_threshold,
first_byte,
second_byte,
sender,
now,
);
eprintln!("{id:?} Finished its job!");
})
}
// If we don't drop this sender
// the receiver will hang forever
drop(sender);

while let Ok(script) = receiver.recv() {
println!("{}", hex::encode(script));
}
eprintln!("Finished!!!!");
}

fn run_iterator(
thread_id: ThreadId,
db: &DB,
high_usage_threshold: u32,
first_byte: u8,
next_byte: Option<u8>,
sender: crossbeam_channel::Sender<[u8; 32]>,
now: Instant,
) {
let mut iter = db.raw_iterator();
eprintln!(
"Thread ({thread_id:?}) Seeking DB to beginning of tx histories for b'H' + {}",
hex::encode([first_byte])
);
// H = 72
let mut compare_vec: Vec<u8> = vec![72, first_byte];
iter.seek(&compare_vec); // Seek to beginning of our section

let mut iter = store.history_db().raw_iterator();
iter.seek(b"H");
// Insert the byte of the next section for comparing
// This will tell us when to stop with a closure
type Checker<'a> = Box<dyn Fn(&[u8]) -> bool + 'a>;
let is_finished: Checker<'_> = if let Some(next) = next_byte {
// Modify the vec to what we're looking for next
// to indicate we left our section
compare_vec[1] = next;
Box::new(|key: &[u8]| -> bool { key.starts_with(&compare_vec) })
} else {
// Modify the vec to only have H so we know when we left H
compare_vec.remove(1);
Box::new(|key: &[u8]| -> bool { !key.starts_with(&compare_vec) })
};

eprintln!("Thread ({thread_id:?}) Seeking done");

let mut curr_scripthash = [0u8; 32];
let mut total_entries = 0;
let mut total_entries: usize = 0;
let mut iter_index: usize = 1;

while iter.valid() {
let key = iter.key().unwrap();

if !key.starts_with(b"H") {
if is_finished(key) {
// We have left the txhistory section,
// but we need to check the final scripthash
send_if_popular(
high_usage_threshold,
total_entries,
curr_scripthash,
&sender,
);
break;
}

let entry: TxHistoryKey =
bincode_util::deserialize_big(key).expect("failed to deserialize TxHistoryKey");
if iter_index % 10_000_000 == 0 {
let duration = now.elapsed().as_secs();
eprintln!(
"Thread ({thread_id:?}) Processing row #{iter_index}... {duration} seconds elapsed"
);
}

// We know that the TxHistory key is 1 byte "H" followed by
// 32 byte scripthash
let entry_hash: [u8; 32] = key[1..33].try_into().unwrap();

if curr_scripthash != entry.hash {
if total_entries > 100 {
println!("{} {}", hex::encode(curr_scripthash), total_entries);
}
if curr_scripthash != entry_hash {
// We have rolled on to a new scripthash
// If the last scripthash was popular
// Collect for sorting
send_if_popular(
high_usage_threshold,
total_entries,
curr_scripthash,
&sender,
);

curr_scripthash = entry.hash;
// After collecting, reset values for next scripthash
curr_scripthash = entry_hash;
total_entries = 0;
}

total_entries += 1;
iter_index += 1;

iter.next();
}
}

if total_entries >= 4000 {
println!(
"scripthash,{},{}",
hex::encode(curr_scripthash),
total_entries
);
#[inline]
fn send_if_popular(
high_usage_threshold: u32,
total_entries: usize,
curr_scripthash: [u8; 32],
sender: &crossbeam_channel::Sender<[u8; 32]>,
) {
if total_entries >= high_usage_threshold as usize {
sender.send(curr_scripthash).unwrap();
}
}
40 changes: 22 additions & 18 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,8 @@ pub enum DBFlush {

impl DB {
pub fn open(path: &Path, config: &Config) -> DB {
debug!("opening DB at {:?}", path);
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.set_compression_type(rocksdb::DBCompressionType::None);
db_opts.set_target_file_size_base(1_073_741_824);
db_opts.set_write_buffer_size(256 << 20);
db_opts.set_disable_auto_compactions(true); // for initial bulk load

// db_opts.set_advise_random_on_open(???);
db_opts.set_compaction_readahead_size(1 << 20);
db_opts.increase_parallelism(2);

// let mut block_opts = rocksdb::BlockBasedOptions::default();
// block_opts.set_block_size(???);

let db = DB {
db: rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB"),
db: open_raw_db(path),
};
db.verify_compatibility(config);
db
Expand Down Expand Up @@ -213,3 +196,24 @@ impl DB {
}
}
}

pub fn open_raw_db<T: rocksdb::ThreadMode>(path: &Path) -> rocksdb::DBWithThreadMode<T> {
debug!("opening DB at {:?}", path);
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.set_compression_type(rocksdb::DBCompressionType::None);
db_opts.set_target_file_size_base(1_073_741_824);
db_opts.set_write_buffer_size(256 << 20);
db_opts.set_disable_auto_compactions(true); // for initial bulk load

// db_opts.set_advise_random_on_open(???);
db_opts.set_compaction_readahead_size(1 << 20);
db_opts.increase_parallelism(2);

// let mut block_opts = rocksdb::BlockBasedOptions::default();
// block_opts.set_block_size(???);

rocksdb::DBWithThreadMode::<T>::open(&db_opts, path).expect("failed to open RocksDB")
}

0 comments on commit c01a2c4

Please sign in to comment.