From 21fcf46fac3f593aac07ac985769e2be6fc82b20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Thu, 19 Sep 2024 19:23:39 -0600 Subject: [PATCH] chore: clean up db sync command (#363) * chore: clean up db sync command * fix: test delete db * chore: tweak log messages --- components/ordhook-cli/src/cli/mod.rs | 12 +- .../ordhook-core/src/core/pipeline/mod.rs | 49 +++---- components/ordhook-core/src/db/mod.rs | 2 +- components/ordhook-core/src/service/mod.rs | 123 ++++++++---------- .../ordhook-sdk-js/src/ordinals_indexer.rs | 2 +- 5 files changed, 78 insertions(+), 110 deletions(-) diff --git a/components/ordhook-cli/src/cli/mod.rs b/components/ordhook-cli/src/cli/mod.rs index 2868ad5e..b4aaaa8a 100644 --- a/components/ordhook-cli/src/cli/mod.rs +++ b/components/ordhook-cli/src/cli/mod.rs @@ -17,7 +17,7 @@ use ordhook::chainhook_sdk::utils::BlockHeights; use ordhook::chainhook_sdk::utils::Context; use ordhook::config::Config; use ordhook::core::meta_protocols::brc20::db::get_brc20_operations_on_block; -use ordhook::core::pipeline::download_and_pipeline_blocks; +use ordhook::core::pipeline::bitcoind_download_blocks; use ordhook::core::pipeline::processors::block_archiving::start_block_archiving_processor; use ordhook::core::pipeline::processors::start_inscription_indexing_processor; use ordhook::core::protocol::inscription_parsing::parse_inscriptions_and_standardize_block; @@ -814,7 +814,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let config = ConfigFile::default(false, false, false, &cmd.config_path, &None)?; initialize_sqlite_dbs(&config, ctx); let service = Service::new(config, ctx.clone()); - service.update_state(None).await?; + service.catch_up_to_bitcoin_chain_tip(None).await?; } Command::Db(OrdhookDbCommand::Repair(subcmd)) => match subcmd { RepairCommand::Blocks(cmd) => { @@ -825,11 +825,11 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let blocks = cmd.get_blocks(); let block_ingestion_processor = start_block_archiving_processor(&config, ctx, false, None); - download_and_pipeline_blocks( + bitcoind_download_blocks( &config, blocks, first_inscription_height(&config), - Some(&block_ingestion_processor), + &block_ingestion_processor, 10_000, ctx, ) @@ -870,11 +870,11 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { &PrometheusMonitoring::new(), ); - download_and_pipeline_blocks( + bitcoind_download_blocks( &config, blocks, first_inscription_height(&config), - Some(&inscription_indexing_processor), + &inscription_indexing_processor, 10_000, ctx, ) diff --git a/components/ordhook-core/src/core/pipeline/mod.rs b/components/ordhook-core/src/core/pipeline/mod.rs index 27b77f6c..fdd35452 100644 --- a/components/ordhook-core/src/core/pipeline/mod.rs +++ b/components/ordhook-core/src/core/pipeline/mod.rs @@ -35,20 +35,16 @@ pub struct PostProcessorController { pub thread_handle: JoinHandle<()>, } -pub async fn download_and_pipeline_blocks( +/// Downloads blocks from bitcoind's RPC interface and pushes them to a `PostProcessorController` so they can be indexed or +/// ingested as needed. +pub async fn bitcoind_download_blocks( config: &Config, blocks: Vec, start_sequencing_blocks_at_height: u64, - blocks_post_processor: Option<&PostProcessorController>, + blocks_post_processor: &PostProcessorController, speed: usize, ctx: &Context, ) -> Result<(), String> { - // let guard = pprof::ProfilerGuardBuilder::default() - // .frequency(20) - // .blocklist(&["libc", "libgcc", "pthread", "vdso"]) - // .build() - // .unwrap(); - let bitcoin_config = BitcoinConfig { username: config.network.bitcoind_rpc_username.clone(), password: config.network.bitcoind_rpc_password.clone(), @@ -154,10 +150,7 @@ pub async fn download_and_pipeline_blocks( let cloned_ctx = ctx.clone(); - let blocks_post_processor_commands_tx = blocks_post_processor - .as_ref() - .and_then(|p| Some(p.commands_tx.clone())); - + let blocks_post_processor_commands_tx = blocks_post_processor.commands_tx.clone(); let storage_thread = hiro_system_kit::thread_named("Block processor dispatcher") .spawn(move || { let mut inbox = HashMap::new(); @@ -171,9 +164,7 @@ pub async fn download_and_pipeline_blocks( cloned_ctx, "#{blocks_processed} blocks successfully sent to processor" ); - if let Some(ref blocks_tx) = blocks_post_processor_commands_tx { - let _ = blocks_tx.send(PostProcessorCommand::Terminate); - } + let _ = blocks_post_processor_commands_tx.send(PostProcessorCommand::Terminate); break; } @@ -216,12 +207,9 @@ pub async fn download_and_pipeline_blocks( // Early "continue" if !ooo_compacted_blocks.is_empty() { blocks_processed += ooo_compacted_blocks.len() as u64; - if let Some(ref blocks_tx) = blocks_post_processor_commands_tx { - let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks( - ooo_compacted_blocks, - vec![], - )); - } + let _ = blocks_post_processor_commands_tx.send( + PostProcessorCommand::ProcessBlocks(ooo_compacted_blocks, vec![]), + ); } if inbox.is_empty() { @@ -240,12 +228,9 @@ pub async fn download_and_pipeline_blocks( blocks_processed += blocks.len() as u64; if !blocks.is_empty() { - if let Some(ref blocks_tx) = blocks_post_processor_commands_tx { - let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks( - compacted_blocks, - blocks, - )); - } + let _ = blocks_post_processor_commands_tx.send( + PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks), + ); } if inbox_cursor > end_block { @@ -304,12 +289,10 @@ pub async fn download_and_pipeline_blocks( try_debug!(ctx, "Pipeline successfully terminated"); - if let Some(post_processor) = blocks_post_processor { - loop { - if let Ok(signal) = post_processor.events_rx.recv() { - match signal { - PostProcessorEvent::Terminated | PostProcessorEvent::Expired => break, - } + loop { + if let Ok(signal) = blocks_post_processor.events_rx.recv() { + match signal { + PostProcessorEvent::Terminated | PostProcessorEvent::Expired => break, } } } diff --git a/components/ordhook-core/src/db/mod.rs b/components/ordhook-core/src/db/mod.rs index b6c9f72d..093b9e49 100644 --- a/components/ordhook-core/src/db/mod.rs +++ b/components/ordhook-core/src/db/mod.rs @@ -93,6 +93,6 @@ pub fn drop_block_data_from_all_dbs( pub fn drop_all_dbs(config: &Config) { let dir_path = &config.expected_cache_path(); if dir_path.exists() { - std::fs::remove_dir_all(dir_path).expect(""); + std::fs::remove_dir_all(dir_path).unwrap(); } } diff --git a/components/ordhook-core/src/service/mod.rs b/components/ordhook-core/src/service/mod.rs index 57604e3f..7ed514a2 100644 --- a/components/ordhook-core/src/service/mod.rs +++ b/components/ordhook-core/src/service/mod.rs @@ -10,7 +10,7 @@ use crate::core::meta_protocols::brc20::parser::ParsedBrc20Operation; use crate::core::meta_protocols::brc20::verifier::{ verify_brc20_operation, verify_brc20_transfer, VerifiedBrc20Operation, }; -use crate::core::pipeline::download_and_pipeline_blocks; +use crate::core::pipeline::bitcoind_download_blocks; use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor; use crate::core::pipeline::processors::inscription_indexing::process_block; use crate::core::pipeline::processors::start_inscription_indexing_processor; @@ -123,12 +123,12 @@ impl Service { } else { None }; - self.catch_up_with_chain_tip(false, check_blocks_integrity, block_post_processor) + if check_blocks_integrity { + self.check_blocks_db_integrity().await?; + } + self.catch_up_to_bitcoin_chain_tip(block_post_processor) .await?; - try_info!( - self.ctx, - "Database up to date, service will start streaming blocks" - ); + try_info!(self.ctx, "Service: Streaming blocks start"); // Sidecar channels setup let observer_sidecar = self.set_up_observer_sidecar_runloop()?; @@ -403,62 +403,50 @@ impl Service { Ok(observer_sidecar) } - pub async fn catch_up_with_chain_tip( - &mut self, - _rebuild_from_scratch: bool, - compact_and_check_rocksdb_integrity: bool, - block_post_processor: Option>, - ) -> Result<(), String> { - { - if compact_and_check_rocksdb_integrity { - let (tip, missing_blocks) = { - let blocks_db = open_blocks_db_with_retry(false, &self.config, &self.ctx); - - let ordhook_db = - open_ordinals_db(&self.config.expected_cache_path(), &self.ctx) - .expect("unable to retrieve ordhook db"); - let tip = find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap() - as u32; - info!( - self.ctx.expect_logger(), - "Checking database integrity up to block #{tip}", - ); - let missing_blocks = find_missing_blocks(&blocks_db, 0, tip, &self.ctx); - (tip, missing_blocks) - }; - if !missing_blocks.is_empty() { - info!( - self.ctx.expect_logger(), - "{} missing blocks detected, will attempt to repair data", - missing_blocks.len() - ); - let block_ingestion_processor = - start_block_archiving_processor(&self.config, &self.ctx, false, None); - download_and_pipeline_blocks( - &self.config, - missing_blocks.into_iter().map(|x| x as u64).collect(), - tip.into(), - Some(&block_ingestion_processor), - 10_000, - &self.ctx, - ) - .await?; - } - let blocks_db_rw = open_blocks_db_with_retry(false, &self.config, &self.ctx); - info!(self.ctx.expect_logger(), "Running database compaction",); - run_compaction(&blocks_db_rw, tip); - } + pub async fn check_blocks_db_integrity(&mut self) -> Result<(), String> { + let (tip, missing_blocks) = { + let blocks_db = open_blocks_db_with_retry(false, &self.config, &self.ctx); + + let ordhook_db = open_ordinals_db(&self.config.expected_cache_path(), &self.ctx) + .expect("unable to retrieve ordhook db"); + let tip = find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap() as u32; + info!( + self.ctx.expect_logger(), + "Checking database integrity up to block #{tip}", + ); + let missing_blocks = find_missing_blocks(&blocks_db, 0, tip, &self.ctx); + (tip, missing_blocks) + }; + if !missing_blocks.is_empty() { + info!( + self.ctx.expect_logger(), + "{} missing blocks detected, will attempt to repair data", + missing_blocks.len() + ); + let block_ingestion_processor = + start_block_archiving_processor(&self.config, &self.ctx, false, None); + bitcoind_download_blocks( + &self.config, + missing_blocks.into_iter().map(|x| x as u64).collect(), + tip.into(), + &block_ingestion_processor, + 10_000, + &self.ctx, + ) + .await?; } - self.update_state(block_post_processor).await + let blocks_db_rw = open_blocks_db_with_retry(false, &self.config, &self.ctx); + info!(self.ctx.expect_logger(), "Running database compaction",); + run_compaction(&blocks_db_rw, tip); + Ok(()) } - pub async fn update_state( + /// Synchronizes and indexes all databases until their block height matches bitcoind's block height. + pub async fn catch_up_to_bitcoin_chain_tip( &self, block_post_processor: Option>, ) -> Result<(), String> { - // First, make sure that rocksdb and sqlite are aligned. - // If rocksdb.chain_tip.height <= sqlite.chain_tip.height - // Perform some block compression until that height. + // 1: Catch up blocks DB so it is at the same height as the ordinals DB. if let Some((start_block, end_block)) = should_sync_rocks_db(&self.config, &self.ctx)? { let blocks_post_processor = start_block_archiving_processor( &self.config, @@ -466,27 +454,25 @@ impl Service { true, block_post_processor.clone(), ); - try_info!( self.ctx, - "Compressing blocks (from #{start_block} to #{end_block})" + "Service: Compressing blocks from #{start_block} to #{end_block}" ); - let blocks = BlockHeights::BlockRange(start_block, end_block) .get_sorted_entries() .map_err(|_e| format!("Block start / end block spec invalid"))?; - download_and_pipeline_blocks( + bitcoind_download_blocks( &self.config, blocks.into(), first_inscription_height(&self.config), - Some(&blocks_post_processor), + &blocks_post_processor, 10_000, &self.ctx, ) .await?; } - // Start predicate processor + // 2: Catch up ordinals DB until it reaches bitcoind block height. This will also advance blocks DB. let mut last_block_processed = 0; while let Some((start_block, end_block, speed)) = should_sync_ordhook_db(&self.config, &self.ctx)? @@ -500,20 +486,18 @@ impl Service { block_post_processor.clone(), &self.prometheus, ); - try_info!( self.ctx, - "Indexing inscriptions from block #{start_block} to block #{end_block}" + "Service: Indexing inscriptions from #{start_block} to #{end_block}" ); - let blocks = BlockHeights::BlockRange(start_block, end_block) .get_sorted_entries() .map_err(|_e| format!("Block start / end block spec invalid"))?; - download_and_pipeline_blocks( + bitcoind_download_blocks( &self.config, blocks.into(), first_inscription_height(&self.config), - Some(&blocks_post_processor), + &blocks_post_processor, speed, &self.ctx, ) @@ -522,6 +506,7 @@ impl Service { last_block_processed = end_block; } + try_info!(self.ctx, "Service: Index has reached bitcoin chain tip"); Ok(()) } @@ -534,11 +519,11 @@ impl Service { let blocks_post_processor = start_transfers_recomputing_processor(&self.config, &self.ctx, block_post_processor); - download_and_pipeline_blocks( + bitcoind_download_blocks( &self.config, blocks, first_inscription_height(&self.config), - Some(&blocks_post_processor), + &blocks_post_processor, 100, &self.ctx, ) diff --git a/components/ordhook-sdk-js/src/ordinals_indexer.rs b/components/ordhook-sdk-js/src/ordinals_indexer.rs index 84d3bafa..26870c56 100644 --- a/components/ordhook-sdk-js/src/ordinals_indexer.rs +++ b/components/ordhook-sdk-js/src/ordinals_indexer.rs @@ -159,7 +159,7 @@ impl OrdinalsIndexingRunloop { match cmd { IndexerCommand::StreamBlocks => { // We start the service as soon as the start() method is being called. - let future = service.catch_up_with_chain_tip(false, true, None); + let future = service.catch_up_with_chain_tip(true, None); let _ = hiro_system_kit::nestable_block_on(future).expect("unable to start indexer"); let future = service.start_event_observer(observer_sidecar); let (command_tx, event_rx) =