Skip to content

Commit

Permalink
Feature: Increase precache performance
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw committed Sep 19, 2023
1 parent 3615c48 commit 811c6c4
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 26 deletions.
1 change: 1 addition & 0 deletions electrs-start-liquid
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ do
--network liquid \
--http-socket-file "${HOME}/socket/esplora-liquid-mainnet" \
--precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \
--precache-threads 100 \
--asset-db-path "${HOME}/asset_registry_db" \
--daemon-dir "${HOME}" \
--db-dir /electrs \
Expand Down
1 change: 1 addition & 0 deletions electrs-start-liquidtestnet
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ do
--network liquidtestnet \
--http-socket-file "${HOME}/socket/esplora-liquid-testnet" \
--precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \
--precache-threads 100 \
--asset-db-path "${HOME}/asset_registry_testnet_db" \
--daemon-dir "${HOME}" \
--db-dir "/electrs" \
Expand Down
1 change: 1 addition & 0 deletions electrs-start-mainnet
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ do
-- \
--http-socket-file "${HOME}/socket/esplora-bitcoin-mainnet" \
--precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \
--precache-threads 100 \
--daemon-dir "${HOME}" \
--db-dir "/electrs" \
--cookie "${BITCOIN_RPC_USER}:${BITCOIN_RPC_PASS}" \
Expand Down
1 change: 1 addition & 0 deletions electrs-start-signet
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ do
--network signet \
--http-socket-file "${HOME}/socket/esplora-bitcoin-signet" \
--precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \
--precache-threads 100 \
--daemon-dir "${HOME}" \
--db-dir "/electrs" \
--cookie "${BITCOIN_RPC_USER}:${BITCOIN_RPC_PASS}" \
Expand Down
1 change: 1 addition & 0 deletions electrs-start-testnet
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ do
--network testnet \
--http-socket-file "${HOME}/socket/esplora-bitcoin-testnet" \
--precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \
--precache-threads 100 \
--daemon-dir "${HOME}" \
--db-dir "/electrs" \
--cookie "${BITCOIN_RPC_USER}:${BITCOIN_RPC_PASS}" \
Expand Down
16 changes: 10 additions & 6 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
));

if let Some(ref precache_file) = config.precache_scripts {
let precache_scripthashes = precache::scripthashes_from_file(precache_file.to_string())
.expect("cannot load scripts to precache");
precache::precache(&chain, precache_scripthashes);
}

let mempool = Arc::new(RwLock::new(Mempool::new(
Arc::clone(&chain),
&metrics,
Expand Down Expand Up @@ -102,6 +96,16 @@ fn run_server(config: Arc<Config>) -> Result<()> {
let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query));
let electrum_server = ElectrumRPC::start(Arc::clone(&config), Arc::clone(&query), &metrics);

if let Some(ref precache_file) = config.precache_scripts {
let precache_scripthashes = precache::scripthashes_from_file(precache_file.to_string())
.expect("cannot load scripts to precache");
precache::precache(
Arc::clone(&chain),
precache_scripthashes,
config.precache_threads,
);
}

loop {
if let Err(err) = signal.wait(Duration::from_millis(500), true) {
info!("stopping server: {}", err);
Expand Down
23 changes: 23 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct Config {
pub index_unspendables: bool,
pub cors: Option<String>,
pub precache_scripts: Option<String>,
pub precache_threads: usize,
pub utxos_limit: usize,
pub electrum_txs_limit: usize,
pub electrum_banner: String,
Expand Down Expand Up @@ -187,6 +188,12 @@ impl Config {
.help("Path to file with list of scripts to pre-cache")
.takes_value(true)
)
.arg(
Arg::with_name("precache_threads")
.long("precache-threads")
.help("Non-zero number of threads to use for precache threadpool. [default: 4 * CORE_COUNT]")
.takes_value(true)
)
.arg(
Arg::with_name("utxos_limit")
.long("utxos-limit")
Expand Down Expand Up @@ -472,6 +479,22 @@ impl Config {
index_unspendables: m.is_present("index_unspendables"),
cors: m.value_of("cors").map(|s| s.to_string()),
precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()),
precache_threads: m.value_of("precache_threads").map_or_else(
|| {
std::thread::available_parallelism()
.expect("Can't get core count")
.get()
* 4
},
|s| match s.parse::<usize>() {
Ok(v) if v > 0 => v,
_ => clap::Error::value_validation_auto(format!(
"The argument '{}' isn't a valid value",
s
))
.exit(),
},
),

#[cfg(feature = "liquid")]
parent_network,
Expand Down
53 changes: 39 additions & 14 deletions src/new_index/precache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,52 @@ use hex;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::sync::{atomic::AtomicUsize, Arc};
use std::time::Instant;

pub fn precache(chain: &ChainQuery, scripthashes: Vec<FullHash>) {
pub fn precache(chain: Arc<ChainQuery>, scripthashes: Vec<FullHash>, threads: usize) {
let total = scripthashes.len();
info!("Pre-caching stats and utxo set for {} scripthashes", total);
info!(
"Pre-caching stats and utxo set on {} threads for {} scripthashes",
threads, total
);

let pool = rayon::ThreadPoolBuilder::new()
.num_threads(16)
.num_threads(threads)
.thread_name(|i| format!("precache-{}", i))
.build()
.unwrap();
pool.install(|| {
scripthashes
.par_iter()
.enumerate()
.for_each(|(i, scripthash)| {
if i % 5 == 0 {
info!("running pre-cache for scripthash {}/{}", i + 1, total);
}
chain.stats(&scripthash[..]);
//chain.utxo(&scripthash[..]);
})
let now = Instant::now();
let counter = AtomicUsize::new(0);
std::thread::spawn(move || {
pool.install(|| {
scripthashes
.par_iter()
.for_each(|scripthash| {
// First, cache
chain.stats(&scripthash[..], crate::new_index::db::DBFlush::Disable);
let _ = chain.utxo(&scripthash[..], usize::MAX, crate::new_index::db::DBFlush::Disable);

// Then, increment the counter
let pre_increment = counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let post_increment_counter = pre_increment + 1;

// Then, log
if post_increment_counter % 500 == 0 {
let now_millis = now.elapsed().as_millis();
info!("{post_increment_counter}/{total} Processed in {now_millis} ms running pre-cache for scripthash");
}

// Every 10k counts, flush the DB to disk
if post_increment_counter % 10000 == 0 {
info!("Flushing cache_db... {post_increment_counter}");
chain.store().cache_db().flush();
info!("Done Flushing cache_db!!! {post_increment_counter}");
}
})
});
// After everything is done, flush the cache
chain.store().cache_db().flush();
});
}

Expand Down
8 changes: 6 additions & 2 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ impl Query {
}

pub fn utxo(&self, scripthash: &[u8]) -> Result<Vec<Utxo>> {
let mut utxos = self.chain.utxo(scripthash, self.config.utxos_limit)?;
let mut utxos = self.chain.utxo(
scripthash,
self.config.utxos_limit,
super::db::DBFlush::Enable,
)?;
let mempool = self.mempool();
utxos.retain(|utxo| !mempool.has_spend(&OutPoint::from(utxo)));
utxos.extend(mempool.utxo(scripthash));
Expand All @@ -111,7 +115,7 @@ impl Query {

pub fn stats(&self, scripthash: &[u8]) -> (ScriptStats, ScriptStats) {
(
self.chain.stats(scripthash),
self.chain.stats(scripthash, super::db::DBFlush::Enable),
self.mempool().stats(scripthash),
)
}
Expand Down
8 changes: 4 additions & 4 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ impl ChainQuery {
}

// TODO: avoid duplication with stats/stats_delta?
pub fn utxo(&self, scripthash: &[u8], limit: usize) -> Result<Vec<Utxo>> {
pub fn utxo(&self, scripthash: &[u8], limit: usize, flush: DBFlush) -> Result<Vec<Utxo>> {
let _timer = self.start_timer("utxo");

// get the last known utxo set and the blockhash it was updated for.
Expand Down Expand Up @@ -598,7 +598,7 @@ impl ChainQuery {
if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write(
vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()],
DBFlush::Enable,
flush,
);
}
}
Expand Down Expand Up @@ -678,7 +678,7 @@ impl ChainQuery {
Ok((utxos, lastblock, processed_items))
}

pub fn stats(&self, scripthash: &[u8]) -> ScriptStats {
pub fn stats(&self, scripthash: &[u8], flush: DBFlush) -> ScriptStats {
let _timer = self.start_timer("stats");

// get the last known stats and the blockhash they are updated for.
Expand Down Expand Up @@ -706,7 +706,7 @@ impl ChainQuery {
if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write(
vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()],
DBFlush::Enable,
flush,
);
}
}
Expand Down

0 comments on commit 811c6c4

Please sign in to comment.