Skip to content

Commit

Permalink
chore: clean up db sync command (#363)
Browse files Browse the repository at this point in the history
* chore: clean up db sync command

* fix: test delete db

* chore: tweak log messages
  • Loading branch information
rafaelcr authored Sep 20, 2024
1 parent 231b756 commit 21fcf46
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 110 deletions.
12 changes: 6 additions & 6 deletions components/ordhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use ordhook::chainhook_sdk::utils::BlockHeights;
use ordhook::chainhook_sdk::utils::Context;
use ordhook::config::Config;
use ordhook::core::meta_protocols::brc20::db::get_brc20_operations_on_block;
use ordhook::core::pipeline::download_and_pipeline_blocks;
use ordhook::core::pipeline::bitcoind_download_blocks;
use ordhook::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use ordhook::core::pipeline::processors::start_inscription_indexing_processor;
use ordhook::core::protocol::inscription_parsing::parse_inscriptions_and_standardize_block;
Expand Down Expand Up @@ -814,7 +814,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let config = ConfigFile::default(false, false, false, &cmd.config_path, &None)?;
initialize_sqlite_dbs(&config, ctx);
let service = Service::new(config, ctx.clone());
service.update_state(None).await?;
service.catch_up_to_bitcoin_chain_tip(None).await?;
}
Command::Db(OrdhookDbCommand::Repair(subcmd)) => match subcmd {
RepairCommand::Blocks(cmd) => {
Expand All @@ -825,11 +825,11 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let blocks = cmd.get_blocks();
let block_ingestion_processor =
start_block_archiving_processor(&config, ctx, false, None);
download_and_pipeline_blocks(
bitcoind_download_blocks(
&config,
blocks,
first_inscription_height(&config),
Some(&block_ingestion_processor),
&block_ingestion_processor,
10_000,
ctx,
)
Expand Down Expand Up @@ -870,11 +870,11 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
&PrometheusMonitoring::new(),
);

download_and_pipeline_blocks(
bitcoind_download_blocks(
&config,
blocks,
first_inscription_height(&config),
Some(&inscription_indexing_processor),
&inscription_indexing_processor,
10_000,
ctx,
)
Expand Down
49 changes: 16 additions & 33 deletions components/ordhook-core/src/core/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,16 @@ pub struct PostProcessorController {
pub thread_handle: JoinHandle<()>,
}

pub async fn download_and_pipeline_blocks(
/// Downloads blocks from bitcoind's RPC interface and pushes them to a `PostProcessorController` so they can be indexed or
/// ingested as needed.
pub async fn bitcoind_download_blocks(
config: &Config,
blocks: Vec<u64>,
start_sequencing_blocks_at_height: u64,
blocks_post_processor: Option<&PostProcessorController>,
blocks_post_processor: &PostProcessorController,
speed: usize,
ctx: &Context,
) -> Result<(), String> {
// let guard = pprof::ProfilerGuardBuilder::default()
// .frequency(20)
// .blocklist(&["libc", "libgcc", "pthread", "vdso"])
// .build()
// .unwrap();

let bitcoin_config = BitcoinConfig {
username: config.network.bitcoind_rpc_username.clone(),
password: config.network.bitcoind_rpc_password.clone(),
Expand Down Expand Up @@ -154,10 +150,7 @@ pub async fn download_and_pipeline_blocks(

let cloned_ctx = ctx.clone();

let blocks_post_processor_commands_tx = blocks_post_processor
.as_ref()
.and_then(|p| Some(p.commands_tx.clone()));

let blocks_post_processor_commands_tx = blocks_post_processor.commands_tx.clone();
let storage_thread = hiro_system_kit::thread_named("Block processor dispatcher")
.spawn(move || {
let mut inbox = HashMap::new();
Expand All @@ -171,9 +164,7 @@ pub async fn download_and_pipeline_blocks(
cloned_ctx,
"#{blocks_processed} blocks successfully sent to processor"
);
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::Terminate);
}
let _ = blocks_post_processor_commands_tx.send(PostProcessorCommand::Terminate);
break;
}

Expand Down Expand Up @@ -216,12 +207,9 @@ pub async fn download_and_pipeline_blocks(
// Early "continue"
if !ooo_compacted_blocks.is_empty() {
blocks_processed += ooo_compacted_blocks.len() as u64;
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
ooo_compacted_blocks,
vec![],
));
}
let _ = blocks_post_processor_commands_tx.send(
PostProcessorCommand::ProcessBlocks(ooo_compacted_blocks, vec![]),
);
}

if inbox.is_empty() {
Expand All @@ -240,12 +228,9 @@ pub async fn download_and_pipeline_blocks(
blocks_processed += blocks.len() as u64;

if !blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
compacted_blocks,
blocks,
));
}
let _ = blocks_post_processor_commands_tx.send(
PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks),
);
}

if inbox_cursor > end_block {
Expand Down Expand Up @@ -304,12 +289,10 @@ pub async fn download_and_pipeline_blocks(

try_debug!(ctx, "Pipeline successfully terminated");

if let Some(post_processor) = blocks_post_processor {
loop {
if let Ok(signal) = post_processor.events_rx.recv() {
match signal {
PostProcessorEvent::Terminated | PostProcessorEvent::Expired => break,
}
loop {
if let Ok(signal) = blocks_post_processor.events_rx.recv() {
match signal {
PostProcessorEvent::Terminated | PostProcessorEvent::Expired => break,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion components/ordhook-core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,6 @@ pub fn drop_block_data_from_all_dbs(
pub fn drop_all_dbs(config: &Config) {
let dir_path = &config.expected_cache_path();
if dir_path.exists() {
std::fs::remove_dir_all(dir_path).expect("");
std::fs::remove_dir_all(dir_path).unwrap();
}
}
123 changes: 54 additions & 69 deletions components/ordhook-core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::core::meta_protocols::brc20::parser::ParsedBrc20Operation;
use crate::core::meta_protocols::brc20::verifier::{
verify_brc20_operation, verify_brc20_transfer, VerifiedBrc20Operation,
};
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::bitcoind_download_blocks;
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use crate::core::pipeline::processors::inscription_indexing::process_block;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
Expand Down Expand Up @@ -123,12 +123,12 @@ impl Service {
} else {
None
};
self.catch_up_with_chain_tip(false, check_blocks_integrity, block_post_processor)
if check_blocks_integrity {
self.check_blocks_db_integrity().await?;
}
self.catch_up_to_bitcoin_chain_tip(block_post_processor)
.await?;
try_info!(
self.ctx,
"Database up to date, service will start streaming blocks"
);
try_info!(self.ctx, "Service: Streaming blocks start");

// Sidecar channels setup
let observer_sidecar = self.set_up_observer_sidecar_runloop()?;
Expand Down Expand Up @@ -403,90 +403,76 @@ impl Service {
Ok(observer_sidecar)
}

pub async fn catch_up_with_chain_tip(
&mut self,
_rebuild_from_scratch: bool,
compact_and_check_rocksdb_integrity: bool,
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
) -> Result<(), String> {
{
if compact_and_check_rocksdb_integrity {
let (tip, missing_blocks) = {
let blocks_db = open_blocks_db_with_retry(false, &self.config, &self.ctx);

let ordhook_db =
open_ordinals_db(&self.config.expected_cache_path(), &self.ctx)
.expect("unable to retrieve ordhook db");
let tip = find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap()
as u32;
info!(
self.ctx.expect_logger(),
"Checking database integrity up to block #{tip}",
);
let missing_blocks = find_missing_blocks(&blocks_db, 0, tip, &self.ctx);
(tip, missing_blocks)
};
if !missing_blocks.is_empty() {
info!(
self.ctx.expect_logger(),
"{} missing blocks detected, will attempt to repair data",
missing_blocks.len()
);
let block_ingestion_processor =
start_block_archiving_processor(&self.config, &self.ctx, false, None);
download_and_pipeline_blocks(
&self.config,
missing_blocks.into_iter().map(|x| x as u64).collect(),
tip.into(),
Some(&block_ingestion_processor),
10_000,
&self.ctx,
)
.await?;
}
let blocks_db_rw = open_blocks_db_with_retry(false, &self.config, &self.ctx);
info!(self.ctx.expect_logger(), "Running database compaction",);
run_compaction(&blocks_db_rw, tip);
}
pub async fn check_blocks_db_integrity(&mut self) -> Result<(), String> {
let (tip, missing_blocks) = {
let blocks_db = open_blocks_db_with_retry(false, &self.config, &self.ctx);

let ordhook_db = open_ordinals_db(&self.config.expected_cache_path(), &self.ctx)
.expect("unable to retrieve ordhook db");
let tip = find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap() as u32;
info!(
self.ctx.expect_logger(),
"Checking database integrity up to block #{tip}",
);
let missing_blocks = find_missing_blocks(&blocks_db, 0, tip, &self.ctx);
(tip, missing_blocks)
};
if !missing_blocks.is_empty() {
info!(
self.ctx.expect_logger(),
"{} missing blocks detected, will attempt to repair data",
missing_blocks.len()
);
let block_ingestion_processor =
start_block_archiving_processor(&self.config, &self.ctx, false, None);
bitcoind_download_blocks(
&self.config,
missing_blocks.into_iter().map(|x| x as u64).collect(),
tip.into(),
&block_ingestion_processor,
10_000,
&self.ctx,
)
.await?;
}
self.update_state(block_post_processor).await
let blocks_db_rw = open_blocks_db_with_retry(false, &self.config, &self.ctx);
info!(self.ctx.expect_logger(), "Running database compaction",);
run_compaction(&blocks_db_rw, tip);
Ok(())
}

pub async fn update_state(
/// Synchronizes and indexes all databases until their block height matches bitcoind's block height.
pub async fn catch_up_to_bitcoin_chain_tip(
&self,
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
) -> Result<(), String> {
// First, make sure that rocksdb and sqlite are aligned.
// If rocksdb.chain_tip.height <= sqlite.chain_tip.height
// Perform some block compression until that height.
// 1: Catch up blocks DB so it is at the same height as the ordinals DB.
if let Some((start_block, end_block)) = should_sync_rocks_db(&self.config, &self.ctx)? {
let blocks_post_processor = start_block_archiving_processor(
&self.config,
&self.ctx,
true,
block_post_processor.clone(),
);

try_info!(
self.ctx,
"Compressing blocks (from #{start_block} to #{end_block})"
"Service: Compressing blocks from #{start_block} to #{end_block}"
);

let blocks = BlockHeights::BlockRange(start_block, end_block)
.get_sorted_entries()
.map_err(|_e| format!("Block start / end block spec invalid"))?;
download_and_pipeline_blocks(
bitcoind_download_blocks(
&self.config,
blocks.into(),
first_inscription_height(&self.config),
Some(&blocks_post_processor),
&blocks_post_processor,
10_000,
&self.ctx,
)
.await?;
}

// Start predicate processor
// 2: Catch up ordinals DB until it reaches bitcoind block height. This will also advance blocks DB.
let mut last_block_processed = 0;
while let Some((start_block, end_block, speed)) =
should_sync_ordhook_db(&self.config, &self.ctx)?
Expand All @@ -500,20 +486,18 @@ impl Service {
block_post_processor.clone(),
&self.prometheus,
);

try_info!(
self.ctx,
"Indexing inscriptions from block #{start_block} to block #{end_block}"
"Service: Indexing inscriptions from #{start_block} to #{end_block}"
);

let blocks = BlockHeights::BlockRange(start_block, end_block)
.get_sorted_entries()
.map_err(|_e| format!("Block start / end block spec invalid"))?;
download_and_pipeline_blocks(
bitcoind_download_blocks(
&self.config,
blocks.into(),
first_inscription_height(&self.config),
Some(&blocks_post_processor),
&blocks_post_processor,
speed,
&self.ctx,
)
Expand All @@ -522,6 +506,7 @@ impl Service {
last_block_processed = end_block;
}

try_info!(self.ctx, "Service: Index has reached bitcoin chain tip");
Ok(())
}

Expand All @@ -534,11 +519,11 @@ impl Service {
let blocks_post_processor =
start_transfers_recomputing_processor(&self.config, &self.ctx, block_post_processor);

download_and_pipeline_blocks(
bitcoind_download_blocks(
&self.config,
blocks,
first_inscription_height(&self.config),
Some(&blocks_post_processor),
&blocks_post_processor,
100,
&self.ctx,
)
Expand Down
2 changes: 1 addition & 1 deletion components/ordhook-sdk-js/src/ordinals_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl OrdinalsIndexingRunloop {
match cmd {
IndexerCommand::StreamBlocks => {
// We start the service as soon as the start() method is being called.
let future = service.catch_up_with_chain_tip(false, true, None);
let future = service.catch_up_with_chain_tip(true, None);
let _ = hiro_system_kit::nestable_block_on(future).expect("unable to start indexer");
let future = service.start_event_observer(observer_sidecar);
let (command_tx, event_rx) =
Expand Down

0 comments on commit 21fcf46

Please sign in to comment.