Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scanner): Restart scanning where left #8080

Merged
merged 24 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
30a034a
start scanner where it was left
oxarbitrage Dec 8, 2023
b5bd607
fix tests
oxarbitrage Dec 8, 2023
db38a27
add a `scan_start_where_left` test
oxarbitrage Dec 9, 2023
39a0bfe
refactor a log msg
oxarbitrage Dec 9, 2023
369ae38
fix some comments
oxarbitrage Dec 9, 2023
53dcb31
remove function
oxarbitrage Dec 11, 2023
68bd53f
fix doc comment
oxarbitrage Dec 11, 2023
ae5bc94
Merge remote-tracking branch 'origin/main' into issue8022
oxarbitrage Dec 12, 2023
bf27cfd
clippy
oxarbitrage Dec 12, 2023
b73a28a
fix `sapling_keys_and_last_scanned_heights()`
oxarbitrage Dec 12, 2023
d7c3440
simplify start height
oxarbitrage Dec 12, 2023
024a1dd
i went too far, revert some changes back
oxarbitrage Dec 12, 2023
fea879c
change log info to every 10k blocks
oxarbitrage Dec 12, 2023
f7fc876
Merge remote-tracking branch 'origin/main' into issue8022
oxarbitrage Dec 12, 2023
6b04fb0
fix build
oxarbitrage Dec 12, 2023
dc7f7d2
Merge branch 'main' into issue8022
teor2345 Dec 13, 2023
9d3b21d
Update height snapshot code and check last height is consistent
teor2345 Dec 13, 2023
24ea29b
Add strictly before and strictly after database key gets
teor2345 Dec 13, 2023
cf44fef
Move to the previous key using strictly before ops
teor2345 Dec 13, 2023
c8ac308
Assert that keys are only inserted once
teor2345 Dec 13, 2023
42bf26d
Update the index in each loop
teor2345 Dec 13, 2023
454ca9a
Update snapshots
teor2345 Dec 13, 2023
371b558
Remove debugging code
teor2345 Dec 13, 2023
0a5c086
start scanning at min available height
oxarbitrage Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ impl Config {
pub fn db_config(&self) -> &DbConfig {
&self.db_config
}

/// Returns the database-specific config as mutable.
pub fn db_config_mut(&mut self) -> &mut DbConfig {
&mut self.db_config
}
}
40 changes: 17 additions & 23 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const INITIAL_WAIT: Duration = Duration::from_secs(15);
const CHECK_INTERVAL: Duration = Duration::from_secs(30);

/// We log an info log with progress after this many blocks.
const INFO_LOG_INTERVAL: u32 = 100_000;
const INFO_LOG_INTERVAL: u32 = 1_000;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

/// Start a scan task that reads blocks from `state`, scans them with the configured keys in
/// `storage`, and then writes the results to `storage`.
Expand All @@ -68,17 +68,17 @@ pub async fn start(

teor2345 marked this conversation as resolved.
Show resolved Hide resolved
// Read keys from the storage on disk, which can block async execution.
let key_storage = storage.clone();
let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
let key_heights = tokio::task::spawn_blocking(move || key_storage.sapling_keys_last_heights())
.wait_for_panics()
.await;
let key_birthdays = Arc::new(key_birthdays);
let key_heights = Arc::new(key_heights);

// Parse and convert keys once, then use them to scan all blocks.
// There is some cryptography here, but it should be fast even with thousands of keys.
let parsed_keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
> = key_birthdays
> = key_heights
.keys()
.map(|key| {
let parsed_keys = sapling_key_to_scan_block_keys(key, network)?;
Expand All @@ -96,7 +96,7 @@ pub async fn start(
state.clone(),
chain_tip_change.clone(),
storage.clone(),
key_birthdays.clone(),
key_heights.clone(),
parsed_keys.clone(),
)
.await?;
Expand Down Expand Up @@ -125,7 +125,7 @@ pub async fn scan_height_and_store_results(
mut state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
key_birthdays: Arc<HashMap<SaplingScanningKey, Height>>,
key_last_scanned_heights: Arc<HashMap<SaplingScanningKey, Height>>,
parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
Expand All @@ -138,17 +138,6 @@ pub async fn scan_height_and_store_results(
let is_info_log =
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;

// TODO: add debug logs?
if is_info_log {
info!(
"Scanning the blockchain: now at block {:?}, current tip {:?}",
height,
chain_tip_change
.latest_chain_tip()
.best_tip_height_and_hash(),
);
}

// Get a block from the state.
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
let block = state
Expand All @@ -168,24 +157,29 @@ pub async fn scan_height_and_store_results(
// Scan it with all the keys.
//
// TODO: scan each key in parallel (after MVP?)
for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
for (key_num, (sapling_key, last_scanned_height)) in key_last_scanned_heights.iter().enumerate()
{
// Only scan what was not scanned for each key
if height <= *last_scanned_height {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
if is_info_log {
info!(
"Scanning the blockchain for key {}, started at block {:?}",
key_num, birthday_height,
"Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}",
key_num, last_scanned_height.as_usize(),
height.as_usize(),
chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(),
);
}

// Get the pre-parsed keys for this configured key.
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();

// Scan the block, which blocks async execution until the scan is complete.
//
// TODO: skip scanning before birthday height (#8022)
let sapling_key = sapling_key.clone();
let block = block.clone();
let mut storage = storage.clone();
Expand Down
25 changes: 24 additions & 1 deletion zebra-scan/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub use db::{SaplingScannedResult, SaplingScanningKey};

use self::db::ScannerWriteBatch;

/// We insert an empty results entry to the database every this interval for each stored key,
/// so we can track progress.
const INSERT_CONTROL_INTERVAL: u32 = 1_000;
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved

/// Store key info and results of the scan.
///
/// `rocksdb` allows concurrent writes through a shared reference,
Expand Down Expand Up @@ -89,10 +93,20 @@ impl Storage {
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
pub fn sapling_keys(&self) -> HashMap<SaplingScanningKey, Height> {
pub fn sapling_keys_birthdays(&self) -> HashMap<SaplingScanningKey, Height> {
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
self.sapling_keys_and_birthday_heights()
}

/// Returns all the keys and their last scanned heights.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
pub fn sapling_keys_last_heights(&self) -> HashMap<SaplingScanningKey, Height> {
self.sapling_keys_and_last_scanned_heights()
}

/// Add the sapling results for `height` to the storage.
///
/// # Performance / Hangs
Expand All @@ -109,6 +123,10 @@ impl Storage {
// in a single batch.
let mut batch = ScannerWriteBatch::default();

// Every `INSERT_CONTROL_INTERVAL` we add a new entry to the scanner database for each key
// so we can track progress made in the last interval even if no transaction was yet found.
let is_control_time = height.0 % INSERT_CONTROL_INTERVAL == 0 && sapling_results.is_empty();

for (index, sapling_result) in sapling_results {
let index = SaplingScannedDatabaseIndex {
sapling_key: sapling_key.clone(),
Expand All @@ -123,6 +141,11 @@ impl Storage {
batch.insert_sapling_result(self, entry);
}

// Add tracking entry for key.
if is_control_time {
batch.insert_sapling_height(self, &sapling_key, height);
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}

self.write_batch(batch);
}

Expand Down
12 changes: 11 additions & 1 deletion zebra-scan/src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,17 @@ impl Storage {

let new_storage = Self { db };

// TODO: report the last scanned height here?
// Report where we are for each key in the database.
let keys = new_storage.sapling_keys_last_heights();
for (key_num, (_key, height)) in keys.iter().enumerate() {
tracing::info!(
"Last scanned height for key number {} is {}, resuming at {}",
key_num,
height.previous().expect("height is not genesis").as_usize(),
height.as_usize(),
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
);
}

tracing::info!("loaded Zebra scanner cache");

new_storage
Expand Down
59 changes: 59 additions & 0 deletions zebra-scan/src/storage/db/sapling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,54 @@ impl Storage {
keys
}

/// Returns all the keys and their last scanned heights.
pub fn sapling_keys_and_last_scanned_heights(&self) -> HashMap<SaplingScanningKey, Height> {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
let sapling_tx_ids = self.sapling_tx_ids_cf();
let mut keys = HashMap::new();

let last_stored_record: Option<(SaplingScannedDatabaseIndex, SaplingScannedResult)> =
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
self.db.zs_last_key_value(&sapling_tx_ids);
if last_stored_record.is_none() {
return keys;
}

let mut last_stored_record_index = last_stored_record
.expect("checked this is `Some` in the code branch above")
.0;

loop {
// Find the previous key, and the last height we have for it.
let Some(entry) = self
.db
.zs_prev_key_value_back_from(&sapling_tx_ids, &last_stored_record_index)
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
else {
break;
};

let sapling_key = entry.0.sapling_key;
let mut height = entry.0.tx_loc.height;
let _last_result: Option<SaplingScannedResult> = entry.1;

let height_results = self.sapling_results_for_key_and_height(&sapling_key, height);

// If there are no results for this block, then it's a "skip up to height" marker, and
// the target height is the next height. If there are some results, it's the actual
// target height.
if height_results.values().all(Option::is_none) {
height = height
.next()
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
.expect("results should only be stored for validated block heights");
}

keys.insert(sapling_key.clone(), height);

// Skip all the results after the next key.
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
last_stored_record_index = SaplingScannedDatabaseIndex::min_for_key(&sapling_key);
}

keys
}

/// Returns the Sapling indexes and results in the supplied range.
///
/// Convenience method for accessing raw data with the correct types.
Expand Down Expand Up @@ -214,4 +262,15 @@ impl ScannerWriteBatch {
SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, skip_up_to_height);
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
}

/// Insert sapling height with no results
pub(crate) fn insert_sapling_height(
&mut self,
storage: &Storage,
sapling_key: &SaplingScanningKey,
height: Height,
) {
let index = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height);
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
}
}
4 changes: 2 additions & 2 deletions zebra-scan/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> {
s.add_sapling_key(&key_to_be_stored, None);

// Check key was added
assert_eq!(s.sapling_keys().len(), 1);
assert_eq!(s.sapling_keys_birthdays().len(), 1);
assert_eq!(
s.sapling_keys().get(&key_to_be_stored),
s.sapling_keys_birthdays().get(&key_to_be_stored),
Some(&s.min_sapling_birthday_height())
);

Expand Down
3 changes: 2 additions & 1 deletion zebra-state/src/service/finalized_state/disk_format/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ impl IntoDisk for SaplingScannedResult {

impl FromDisk for SaplingScannedResult {
fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
SaplingScannedResult(bytes.as_ref().try_into().unwrap())
// TODO: Change of confirm the `unwrap_or` is good enough.
SaplingScannedResult(bytes.as_ref().try_into().unwrap_or([0; 32]))
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
97 changes: 97 additions & 0 deletions zebrad/tests/acceptance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2860,3 +2860,100 @@ fn scan_task_starts() -> Result<()> {

Ok(())
}

/// Test that the scanner can continue scanning where it was left when zebrad restarts.
///
/// Needs a cache state close to the tip. A possible way to run it locally is:
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
///
/// export ZEBRA_CACHED_STATE_DIR="/path/to/zebra/state"
/// cargo test scan_start_where_left --features="shielded-scan" -- --ignored --nocapture
///
/// The test will run zebrad with a key to scan, scan the first few blocks after sapling and then stops.
/// Then it will restart zebrad and check that it resumes scanning where it was left.
///
/// Note: This test will remove all the contents you may have in the ZEBRA_CACHED_STATE_DIR/private-scan directory
/// so it can start with an empty scanning state.
#[ignore]
#[test]
#[cfg(feature = "shielded-scan")]
fn scan_start_where_left() -> Result<()> {
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
use indexmap::IndexMap;
use zebra_scan::storage::db::SCANNER_DATABASE_KIND;

let _init_guard = zebra_test::init();

// use `UpdateZebraCachedStateNoRpc` as the test type to make sure a zebrad cache state is available.
let test_type = TestType::UpdateZebraCachedStateNoRpc;
if let Some(cache_dir) = test_type.zebrad_state_path("scan test") {
// Add a key to the config
const ZECPAGES_VIEWING_KEY: &str = "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz";
let mut config = default_test_config(Mainnet)?;
let mut keys = IndexMap::new();
keys.insert(ZECPAGES_VIEWING_KEY.to_string(), 1);
config.shielded_scan.sapling_keys_to_scan = keys;

// Add the cache dir to shielded scan, make it the same as the zebrad cache state.
config.shielded_scan.db_config_mut().cache_dir = cache_dir.clone();
config.shielded_scan.db_config_mut().ephemeral = false;

// Add the cache dir to state.
config.state.cache_dir = cache_dir.clone();
config.state.ephemeral = false;

// Remove the scan directory before starting.
let scan_db_path = cache_dir.join(SCANNER_DATABASE_KIND);
fs::remove_dir_all(std::path::Path::new(&scan_db_path)).ok();

// Start zebra with the config.
let mut zebrad = testdir()?
.with_exact_config(&config)?
.spawn_child(args!["start"])?
.with_timeout(test_type.zebrad_timeout());

// Check scanner was started.
zebrad.expect_stdout_line_matches("loaded Zebra scanner cache")?;

// The first time
zebrad.expect_stdout_line_matches(
r"Scanning the blockchain for key 0, started at block 419200, now at block 420000",
)?;

// Make sure scanner scans a few blocks.
zebrad.expect_stdout_line_matches(
r"Scanning the blockchain for key 0, started at block 419200, now at block 421000",
)?;
zebrad.expect_stdout_line_matches(
r"Scanning the blockchain for key 0, started at block 419200, now at block 422000",
)?;

// Kill the node.
zebrad.kill(false)?;
let output = zebrad.wait_with_output()?;

// Make sure the command was killed
output.assert_was_killed()?;
output.assert_failure()?;

// Start the node again.
let mut zebrad = testdir()?
.with_exact_config(&config)?
.spawn_child(args!["start"])?
.with_timeout(test_type.zebrad_timeout());

// Resuming message.
zebrad.expect_stdout_line_matches(
"Last scanned height for key number 0 is 421000, resuming at 421001",
)?;
zebrad.expect_stdout_line_matches("loaded Zebra scanner cache")?;

// Start scanning where it was left.
zebrad.expect_stdout_line_matches(
r"Scanning the blockchain for key 0, started at block 421001, now at block 422000",
)?;
zebrad.expect_stdout_line_matches(
r"canning the blockchain for key 0, started at block 421001, now at block 423000",
)?;
}

Ok(())
}
Loading