From 79aebb8a6cfe5f37a8aa1d76571851bb50781ebc Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Thu, 29 Aug 2024 10:04:50 +0100 Subject: [PATCH 01/13] added re-org verification --- Cargo.lock | 1 + zingo-sync/Cargo.toml | 3 + zingo-sync/src/error.rs | 11 +++ zingo-sync/src/lib.rs | 1 + zingo-sync/src/scan.rs | 14 +-- zingo-sync/src/scan/compact_blocks.rs | 22 +++-- zingo-sync/src/scan/error.rs | 22 +++++ zingo-sync/src/scan/task.rs | 13 ++- zingo-sync/src/sync.rs | 133 +++++++++++++++++++------- 9 files changed, 167 insertions(+), 53 deletions(-) create mode 100644 zingo-sync/src/error.rs create mode 100644 zingo-sync/src/scan/error.rs diff --git a/Cargo.lock b/Cargo.lock index 7420b7dc56..ae47b9d662 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4222,6 +4222,7 @@ dependencies = [ "sapling-crypto", "sha2 0.9.9", "shardtree", + "thiserror", "tokio", "tonic", "tracing", diff --git a/zingo-sync/Cargo.toml b/zingo-sync/Cargo.toml index 043e30612f..3df25e5ec8 100644 --- a/zingo-sync/Cargo.toml +++ b/zingo-sync/Cargo.toml @@ -38,6 +38,9 @@ rayon.workspace = true # Minimise boilerplate getset.workspace = true +# Error handling +thiserror.workspace = true + # temp zcash_address.workspace = true sha2.workspace = true diff --git a/zingo-sync/src/error.rs b/zingo-sync/src/error.rs new file mode 100644 index 0000000000..a5eab3ff15 --- /dev/null +++ b/zingo-sync/src/error.rs @@ -0,0 +1,11 @@ +//! Top level error module for the crate + +use crate::scan::error::ScanError; + +/// Top level error enum encapsulating any error that may occur during sync +#[derive(Debug, thiserror::Error)] +pub enum SyncError { + /// Errors associated with scanning + #[error("Scan error. {0}")] + ScanError(#[from] ScanError), +} diff --git a/zingo-sync/src/lib.rs b/zingo-sync/src/lib.rs index 6fd5fe12bc..d49635aaea 100644 --- a/zingo-sync/src/lib.rs +++ b/zingo-sync/src/lib.rs @@ -4,6 +4,7 @@ //! Entrypoint: [`crate::sync::sync`] pub mod client; +pub mod error; pub(crate) mod keys; #[allow(missing_docs)] pub mod primitives; diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index b4c87e004c..ff8682da1e 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -19,9 +19,12 @@ use crate::{ witness::ShardTreeData, }; -use self::{compact_blocks::scan_compact_blocks, transactions::scan_transactions}; +use self::{ + compact_blocks::scan_compact_blocks, error::ScanError, transactions::scan_transactions, +}; mod compact_blocks; +pub mod error; pub(crate) mod task; pub(crate) mod transactions; @@ -135,15 +138,15 @@ impl DecryptedNoteData { } } -// scans a given range and returns all data relevant to the specified keys -// `previous_wallet_block` is the wallet block with height [scan_range.start - 1] +/// Scans a given range and returns all data relevant to the specified keys. +/// `previous_wallet_block` is the wallet block with height [scan_range.start - 1]. pub(crate) async fn scan

( fetch_request_sender: mpsc::UnboundedSender, parameters: &P, ufvks: &HashMap, scan_range: ScanRange, previous_wallet_block: Option, -) -> Result +) -> Result where P: Parameters + Sync + Send + 'static, { @@ -165,8 +168,7 @@ where .await .unwrap(); - let scan_data = - scan_compact_blocks(compact_blocks, parameters, ufvks, initial_scan_data).unwrap(); + let scan_data = scan_compact_blocks(compact_blocks, parameters, ufvks, initial_scan_data)?; let ScanData { nullifiers, diff --git a/zingo-sync/src/scan/compact_blocks.rs b/zingo-sync/src/scan/compact_blocks.rs index 6e6ab75d8e..f2d8fd0c1c 100644 --- a/zingo-sync/src/scan/compact_blocks.rs +++ b/zingo-sync/src/scan/compact_blocks.rs @@ -23,7 +23,10 @@ use crate::{ use self::runners::{BatchRunners, DecryptedOutput}; -use super::{DecryptedNoteData, InitialScanData, ScanData}; +use super::{ + error::{ContinuityError, ScanError}, + DecryptedNoteData, InitialScanData, ScanData, +}; mod runners; @@ -32,11 +35,11 @@ pub(crate) fn scan_compact_blocks

( parameters: &P, ufvks: &HashMap, initial_scan_data: InitialScanData, -) -> Result +) -> Result where P: Parameters + Sync + Send + 'static, { - check_continuity(&compact_blocks, initial_scan_data.previous_block.as_ref()).unwrap(); + check_continuity(&compact_blocks, initial_scan_data.previous_block.as_ref())?; let scanning_keys = ScanningKeys::from_account_ufvks(ufvks.clone()); let mut runners = trial_decrypt(parameters, &scanning_keys, &compact_blocks).unwrap(); @@ -162,7 +165,7 @@ where fn check_continuity( compact_blocks: &[CompactBlock], previous_compact_block: Option<&WalletBlock>, -) -> Result<(), ()> { +) -> Result<(), ContinuityError> { let mut prev_height: Option = None; let mut prev_hash: Option = None; @@ -174,13 +177,20 @@ fn check_continuity( for block in compact_blocks { if let Some(prev_height) = prev_height { if block.height() != prev_height + 1 { - panic!("height discontinuity"); + return Err(ContinuityError::HeightDiscontinuity { + height: block.height(), + previous_block_height: prev_height, + }); } } if let Some(prev_hash) = prev_hash { if block.prev_hash() != prev_hash { - panic!("hash discontinuity"); + return Err(ContinuityError::HashDiscontinuity { + height: block.height(), + prev_hash: block.prev_hash(), + previous_block_hash: prev_hash, + }); } } diff --git a/zingo-sync/src/scan/error.rs b/zingo-sync/src/scan/error.rs new file mode 100644 index 0000000000..8c35dce3d8 --- /dev/null +++ b/zingo-sync/src/scan/error.rs @@ -0,0 +1,22 @@ +use zcash_primitives::{block::BlockHash, consensus::BlockHeight}; + +#[derive(Debug, thiserror::Error)] +pub enum ScanError { + #[error("Continuity error. {0}")] + ContinuityError(#[from] ContinuityError), +} + +#[derive(Debug, thiserror::Error)] +pub enum ContinuityError { + #[error("Height discontinuity. Block with height {height} is not continuous with previous block height {previous_block_height}")] + HeightDiscontinuity { + height: BlockHeight, + previous_block_height: BlockHeight, + }, + #[error("Hash discontinuity. Block prev_hash {prev_hash} with height {height} does not match previous block hash {previous_block_hash}")] + HashDiscontinuity { + height: BlockHeight, + prev_hash: BlockHash, + previous_block_hash: BlockHash, + }, +} diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 9ec52d8901..cb4465510a 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -14,13 +14,13 @@ use zcash_primitives::{consensus::Parameters, zip32::AccountId}; use crate::{client::FetchRequest, primitives::WalletBlock}; -use super::{scan, ScanResults}; +use super::{error::ScanError, scan, ScanResults}; const SCAN_WORKER_POOLSIZE: usize = 2; pub(crate) struct Scanner

{ workers: Vec, - scan_results_sender: mpsc::UnboundedSender<(ScanRange, ScanResults)>, + scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, parameters: P, ufvks: HashMap, @@ -32,7 +32,7 @@ where P: Parameters + Sync + Send + 'static, { pub(crate) fn new( - scan_results_sender: mpsc::UnboundedSender<(ScanRange, ScanResults)>, + scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, parameters: P, ufvks: HashMap, @@ -104,7 +104,7 @@ impl WorkerHandle { struct ScanWorker

{ is_scanning: Arc, scan_task_receiver: mpsc::UnboundedReceiver, - scan_results_sender: mpsc::UnboundedSender<(ScanRange, ScanResults)>, + scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, parameters: P, ufvks: HashMap, @@ -116,7 +116,7 @@ where { fn new( scan_task_receiver: mpsc::UnboundedReceiver, - scan_results_sender: mpsc::UnboundedSender<(ScanRange, ScanResults)>, + scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, parameters: P, ufvks: HashMap, @@ -142,8 +142,7 @@ where scan_task.scan_range.clone(), scan_task.previous_wallet_block, ) - .await - .unwrap(); + .await; self.scan_results_sender .send((scan_task.scan_range, scan_results)) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index b9b3ea92d1..f51174212b 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -7,7 +7,9 @@ use std::time::Duration; use crate::client::fetch::fetch; use crate::client::{self, FetchRequest}; +use crate::error::SyncError; use crate::primitives::SyncState; +use crate::scan::error::{ContinuityError, ScanError}; use crate::scan::task::{ScanTask, Scanner}; use crate::scan::transactions::scan_transactions; use crate::scan::{DecryptedNoteData, ScanResults}; @@ -27,15 +29,15 @@ use zcash_primitives::transaction::TxId; use zcash_primitives::zip32::AccountId; // TODO; replace fixed batches with orchard shard ranges (block ranges containing all note commitments to an orchard shard or fragment of a shard) -const BATCH_SIZE: u32 = 10; -// const BATCH_SIZE: u32 = 1_000; +const BATCH_SIZE: u32 = 1_000; +const VERIFY_BLOCK_RANGE_SIZE: u32 = 10; /// Syncs a wallet to the latest state of the blockchain pub async fn sync( client: CompactTxStreamerClient, // TODO: change underlying service for generic parameters: &P, wallet: &mut W, -) -> Result<(), ()> +) -> Result<(), SyncError> where P: Parameters + Sync + Send + 'static, W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees, @@ -75,8 +77,7 @@ where // if a scan worker is idle, send it a new scan task if scanner.is_worker_idle() { - if let Some(scan_range) = prepare_next_scan_range(wallet.get_sync_state_mut().unwrap()) - { + if let Some(scan_range) = select_scan_range(wallet.get_sync_state_mut().unwrap()) { let previous_wallet_block = wallet .get_wallet_block(scan_range.block_range().start - 1) .ok(); @@ -86,21 +87,24 @@ where .unwrap(); } else { // when no more ranges are available to scan, break out of the loop + // TODO: is the case where there is less than WORKER_POOLSIZE ranges left to scan but re-org is hit covered? break; } } match scan_results_receiver.try_recv() { - Ok((scan_range, scan_results)) => process_scan_results( - wallet, - fetch_request_sender.clone(), - parameters, - &ufvks, - scan_range, - scan_results, - ) - .await - .unwrap(), + Ok((scan_range, scan_results)) => { + process_scan_results( + wallet, + fetch_request_sender.clone(), + parameters, + &ufvks, + scan_range, + scan_results, + ) + .await + .unwrap(); + } Err(TryRecvError::Empty) => (), Err(TryRecvError::Disconnected) => break, } @@ -183,16 +187,18 @@ where Ok(()) } -/// Prepares the next scan range for scanning. -/// Returns `None` if there are no more ranges to scan -fn prepare_next_scan_range(sync_state: &mut SyncState) -> Option { +/// Selects and prepares the next scan range for scanning. +/// Sets the range for scanning to `Ignored` priority in the wallet [sync_state] but returns the scan range with its initial priority. +/// Returns `None` if there are no more ranges to scan. +fn select_scan_range(sync_state: &mut SyncState) -> Option { let scan_ranges = sync_state.scan_ranges_mut(); - // placeholder for algorythm that determines highest priority range to scan + // TODO: placeholder for algorythm that determines highest priority range to scan let (index, selected_scan_range) = scan_ranges.iter_mut().enumerate().find(|(_, range)| { range.priority() != ScanPriority::Scanned && range.priority() != ScanPriority::Ignored })?; + // TODO: replace with new range split/splice helpers // if scan range is larger than BATCH_SIZE, split off and return a batch from the lower end and update scan ranges if let Some((lower_range, higher_range)) = selected_scan_range .split_at(selected_scan_range.block_range().start + BlockHeight::from_u32(BATCH_SIZE)) @@ -221,18 +227,41 @@ async fn process_scan_results( parameters: &P, ufvks: &HashMap, scan_range: ScanRange, - scan_results: ScanResults, -) -> Result<(), ()> + scan_results: Result, +) -> Result<(), SyncError> where P: Parameters, W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees, { - update_wallet_data(wallet, scan_results).unwrap(); - link_nullifiers(wallet, fetch_request_sender, parameters, ufvks) - .await - .unwrap(); - remove_irrelevant_data(wallet, &scan_range).unwrap(); - mark_scanned(scan_range, wallet.get_sync_state_mut().unwrap()).unwrap(); + match scan_results { + Ok(results) => { + update_wallet_data(wallet, results).unwrap(); + link_nullifiers(wallet, fetch_request_sender, parameters, ufvks) + .await + .unwrap(); + remove_irrelevant_data(wallet, &scan_range).unwrap(); + set_scan_priority( + wallet.get_sync_state_mut().unwrap(), + scan_range.block_range(), + ScanPriority::Scanned, + ) + .unwrap(); + // TODO: also combine adjacent scanned ranges together + } + Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => { + if height == scan_range.block_range().start { + // error handling in case of re-org where first block prev_hash in scan range does not match previous wallet block hash + let sync_state = wallet.get_sync_state_mut().unwrap(); + set_scan_priority(sync_state, scan_range.block_range(), scan_range.priority()) + .unwrap(); + verify_scan_range_tip(sync_state, height - 1); + // TODO: invalidate wallet data + } else { + scan_results?; + } + } + Err(e) => return Err(e.into()), + } Ok(()) } @@ -358,6 +387,36 @@ where Ok(()) } +/// Splits out the highest VERIFY_BLOCK_RANGE_SIZE blocks from the scan range containing the given block height +/// and sets it's priority to `verify`. +/// +/// Panics if the scan range containing the given block height is not of priority `Scanned` +fn verify_scan_range_tip(sync_state: &mut SyncState, block_height: BlockHeight) { + let (index, scan_range) = sync_state + .scan_ranges() + .iter() + .enumerate() + .find(|(_, range)| range.block_range().contains(&block_height)) + .expect("scan range containing given block height should always exist!"); + + if scan_range.priority() != ScanPriority::Scanned { + panic!("scan range should always have scan priority `Scanned`!") + } + + let block_range_to_verify = Range { + start: scan_range + .block_range() + .end + .saturating_sub(VERIFY_BLOCK_RANGE_SIZE), + end: scan_range.block_range().end, + }; + let split_ranges = + split_out_scan_range(scan_range, block_range_to_verify, ScanPriority::Verify); + sync_state + .scan_ranges_mut() + .splice(index..=index, split_ranges); +} + /// Splits out a scan range surrounding a given block height with the specified priority #[allow(dead_code)] fn update_scan_priority( @@ -444,6 +503,7 @@ fn split_out_scan_range( split_ranges } +// TODO: replace this function with a filter on the data added to wallet fn remove_irrelevant_data(wallet: &mut W, scan_range: &ScanRange) -> Result<(), ()> where W: SyncWallet + SyncBlocks + SyncNullifiers + SyncTransactions, @@ -468,7 +528,7 @@ where .map(|tx| tx.block_height()) .collect::>(); wallet.get_wallet_blocks_mut().unwrap().retain(|height, _| { - *height >= scan_range.block_range().end + *height >= scan_range.block_range().end.saturating_sub(1) || *height >= wallet_height.saturating_sub(100) || wallet_transaction_heights.contains(height) }); @@ -486,20 +546,25 @@ where Ok(()) } -fn mark_scanned(scan_range: ScanRange, sync_state: &mut SyncState) -> Result<(), ()> { +/// Sets the scan range in [sync_state] with [block_range] to the given [scan_priority]. +/// +/// Panics if no scan range is found in [sync_state] with a block range of exactly [block_range]. +fn set_scan_priority( + sync_state: &mut SyncState, + block_range: &Range, + scan_priority: ScanPriority, +) -> Result<(), ()> { let scan_ranges = sync_state.scan_ranges_mut(); if let Some((index, range)) = scan_ranges .iter() .enumerate() - .find(|(_, range)| range.block_range() == scan_range.block_range()) + .find(|(_, range)| range.block_range() == block_range) { - scan_ranges[index] = - ScanRange::from_parts(range.block_range().clone(), ScanPriority::Scanned); + scan_ranges[index] = ScanRange::from_parts(range.block_range().clone(), scan_priority); } else { - panic!("scanned range not found!") + panic!("scan range with block range {:?} not found!", block_range) } - // TODO: also combine adjacent scanned ranges together Ok(()) } From 276e74b155ab55f1768812e5682e8f75a10f537b Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Thu, 29 Aug 2024 13:43:46 +0100 Subject: [PATCH 02/13] added invalidation --- zingo-sync/src/sync.rs | 36 ++++++++++++++++--- zingo-sync/src/traits.rs | 78 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 106 insertions(+), 8 deletions(-) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index f51174212b..313b6b5804 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -253,9 +253,10 @@ where // error handling in case of re-org where first block prev_hash in scan range does not match previous wallet block hash let sync_state = wallet.get_sync_state_mut().unwrap(); set_scan_priority(sync_state, scan_range.block_range(), scan_range.priority()) - .unwrap(); - verify_scan_range_tip(sync_state, height - 1); - // TODO: invalidate wallet data + .unwrap(); // reset scan range to initial priority in wallet sync state + let scan_range_to_verify = + verify_scan_range_tip(sync_state, height.saturating_sub(1)); + invalidate_scan_range(wallet, scan_range_to_verify).unwrap(); } else { scan_results?; } @@ -266,6 +267,23 @@ where Ok(()) } +fn invalidate_scan_range(wallet: &mut W, scan_range_to_verify: ScanRange) -> Result<(), ()> +where + W: SyncBlocks + SyncTransactions + SyncNullifiers, +{ + wallet + .remove_wallet_blocks(scan_range_to_verify.block_range()) + .unwrap(); + wallet + .remove_wallet_transactions(scan_range_to_verify.block_range()) + .unwrap(); + wallet + .remove_nullifiers(scan_range_to_verify.block_range()) + .unwrap(); + + Ok(()) +} + fn update_wallet_data(wallet: &mut W, scan_results: ScanResults) -> Result<(), ()> where W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees, @@ -389,9 +407,10 @@ where /// Splits out the highest VERIFY_BLOCK_RANGE_SIZE blocks from the scan range containing the given block height /// and sets it's priority to `verify`. +/// Returns a clone of the scan range to be verified. /// /// Panics if the scan range containing the given block height is not of priority `Scanned` -fn verify_scan_range_tip(sync_state: &mut SyncState, block_height: BlockHeight) { +fn verify_scan_range_tip(sync_state: &mut SyncState, block_height: BlockHeight) -> ScanRange { let (index, scan_range) = sync_state .scan_ranges() .iter() @@ -412,9 +431,18 @@ fn verify_scan_range_tip(sync_state: &mut SyncState, block_height: BlockHeight) }; let split_ranges = split_out_scan_range(scan_range, block_range_to_verify, ScanPriority::Verify); + + assert!(split_ranges.len() == 2); + let scan_range_to_verify = split_ranges + .last() + .expect("split_ranges should always have exactly 2 elements") + .clone(); + sync_state .scan_ranges_mut() .splice(index..=index, split_ranges); + + scan_range_to_verify } /// Splits out a scan range surrounding a given block height with the specified priority diff --git a/zingo-sync/src/traits.rs b/zingo-sync/src/traits.rs index d339655fe0..7289a00674 100644 --- a/zingo-sync/src/traits.rs +++ b/zingo-sync/src/traits.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; +use std::ops::Range; use zcash_client_backend::keys::UnifiedFullViewingKey; use zcash_primitives::consensus::BlockHeight; @@ -33,8 +34,6 @@ pub trait SyncWallet { /// Trait for interfacing [`crate::primitives::WalletBlock`]s with wallet data pub trait SyncBlocks: SyncWallet { - // TODO: add method to get wallet data for writing defualt implementations on other methods - /// Get a stored wallet compact block from wallet data by block height /// Must return error if block is not found fn get_wallet_block(&self, block_height: BlockHeight) -> Result; @@ -53,6 +52,17 @@ pub trait SyncBlocks: SyncWallet { Ok(()) } + + /// Removes all wallet blocks with block height's within the given [invalid_range] (end exclusive) + fn remove_wallet_blocks( + &mut self, + invalid_range: &Range, + ) -> Result<(), Self::Error> { + self.get_wallet_blocks_mut()? + .retain(|_, block| !invalid_range.contains(&block.block_height())); + + Ok(()) + } } /// Trait for interfacing [`crate::primitives::WalletTransaction`]s with wallet data @@ -75,12 +85,57 @@ pub trait SyncTransactions: SyncWallet { Ok(()) } + + /// Removes all wallet transactions with block height's within the given [invalid_range] (end exclusive) + /// Also sets any output's spending_transaction field to `None` if it's spending transaction was removed. + fn remove_wallet_transactions( + &mut self, + invalid_range: &Range, + ) -> Result<(), Self::Error> { + // Replace with `extract_if()` when it's in stable rust + let invalid_txids: Vec = self + .get_wallet_transactions()? + .values() + .filter(|tx| invalid_range.contains(&tx.block_height())) + .map(|tx| tx.transaction().txid()) + .collect(); + + let wallet_transactions = self.get_wallet_transactions_mut()?; + wallet_transactions + .values_mut() + .flat_map(|tx| tx.sapling_notes_mut()) + .filter(|note| { + note.spending_transaction().map_or_else( + || false, + |spending_txid| invalid_txids.contains(&spending_txid), + ) + }) + .for_each(|note| { + note.set_spending_transaction(None); + }); + wallet_transactions + .values_mut() + .flat_map(|tx| tx.orchard_notes_mut()) + .filter(|note| { + note.spending_transaction().map_or_else( + || false, + |spending_txid| invalid_txids.contains(&spending_txid), + ) + }) + .for_each(|note| { + note.set_spending_transaction(None); + }); + + invalid_txids.iter().for_each(|invalid_txid| { + wallet_transactions.remove(invalid_txid); + }); + + Ok(()) + } } /// Trait for interfacing nullifiers with wallet data pub trait SyncNullifiers: SyncWallet { - // TODO: add method to get wallet data for writing defualt implementations on other methods - // /// Get wallet nullifier map // fn get_nullifiers(&self) -> Result<&NullifierMap, Self::Error>; @@ -98,6 +153,19 @@ pub trait SyncNullifiers: SyncWallet { Ok(()) } + + /// Removes all mapped nullifiers with block height's within the given [invalid_range] (end exclusive) + fn remove_nullifiers(&mut self, invalid_range: &Range) -> Result<(), Self::Error> { + let nullifier_map = self.get_nullifiers_mut()?; + nullifier_map + .sapling_mut() + .retain(|_, (block_height, _)| !invalid_range.contains(block_height)); + nullifier_map + .orchard_mut() + .retain(|_, (block_height, _)| !invalid_range.contains(block_height)); + + Ok(()) + } } /// Trait for interfacing shard tree data with wallet data @@ -131,4 +199,6 @@ pub trait SyncShardTrees: SyncWallet { Ok(()) } + + // TODO: check if shard tree needs to be invalidated due to re-org or leaves can be inserted to replace invalid parts of commitment tree } From 6d20ca560914c9c9029010015ab59389cf851c4e Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Thu, 29 Aug 2024 18:02:45 +0100 Subject: [PATCH 03/13] refactored main sync loop --- zingo-sync/src/scan/task.rs | 19 +++++++++-- zingo-sync/src/sync.rs | 67 +++++++++++++++++++------------------ 2 files changed, 51 insertions(+), 35 deletions(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index cb4465510a..918f14aae8 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -20,6 +20,7 @@ const SCAN_WORKER_POOLSIZE: usize = 2; pub(crate) struct Scanner

{ workers: Vec, + workers_count: usize, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, parameters: P, @@ -41,6 +42,7 @@ where Self { workers, + workers_count: 0, scan_results_sender, fetch_request_sender, parameters, @@ -61,10 +63,12 @@ where let is_scanning = Arc::clone(&worker.is_scanning); let handle = tokio::spawn(async move { worker.run().await }); self.workers.push(WorkerHandle { - _handle: handle, + _id: self.workers_count, + handle, is_scanning, scan_task_sender, }); + self.workers_count += 1; } } @@ -72,6 +76,16 @@ where self.workers.iter().any(|worker| !worker.is_scanning()) } + pub(crate) fn shutdown_idle_workers(&self) { + // TODO: use take() with options on senders and handles to shutdown workers gracefully + self.workers + .iter() + .filter(|worker| !worker.is_scanning()) + .for_each(|worker| { + worker.handle.abort(); + }); + } + pub(crate) fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { if let Some(worker) = self.workers.iter().find(|worker| !worker.is_scanning()) { worker.add_scan_task(scan_task).unwrap(); @@ -84,7 +98,8 @@ where } struct WorkerHandle { - _handle: JoinHandle>, + _id: usize, + handle: JoinHandle>, is_scanning: Arc, scan_task_sender: mpsc::UnboundedSender, } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 313b6b5804..6bce96cc8a 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -73,24 +73,8 @@ where let mut interval = tokio::time::interval(Duration::from_millis(30)); loop { - interval.tick().await; // TODO: tokio select to recieve scan results before tick - - // if a scan worker is idle, send it a new scan task - if scanner.is_worker_idle() { - if let Some(scan_range) = select_scan_range(wallet.get_sync_state_mut().unwrap()) { - let previous_wallet_block = wallet - .get_wallet_block(scan_range.block_range().start - 1) - .ok(); - - scanner - .add_scan_task(ScanTask::from_parts(scan_range, previous_wallet_block)) - .unwrap(); - } else { - // when no more ranges are available to scan, break out of the loop - // TODO: is the case where there is less than WORKER_POOLSIZE ranges left to scan but re-org is hit covered? - break; - } - } + // TODO: add tokio select to optimise receiver + interval.tick().await; match scan_results_receiver.try_recv() { Ok((scan_range, scan_results)) => { @@ -104,32 +88,49 @@ where ) .await .unwrap(); + + if scanner.is_worker_idle() { + create_scan_task(wallet, &scanner).unwrap(); + } + } + Err(TryRecvError::Empty) => { + if scanner.is_worker_idle() { + create_scan_task(wallet, &scanner).unwrap(); + } + + // TODO: if all workers have handle taken, drop scanner. } - Err(TryRecvError::Empty) => (), Err(TryRecvError::Disconnected) => break, } } - drop(scanner); - while let Some((scan_range, scan_results)) = scan_results_receiver.recv().await { - process_scan_results( - wallet, - fetch_request_sender.clone(), - parameters, - &ufvks, - scan_range, - scan_results, - ) - .await - .unwrap(); - } - drop(fetch_request_sender); try_join_all(handles).await.unwrap(); Ok(()) } +fn create_scan_task(wallet: &mut W, scanner: &Scanner

) -> Result<(), ()> +where + P: Parameters + Sync + Send + 'static, + W: SyncWallet + SyncBlocks, +{ + if let Some(scan_range) = select_scan_range(wallet.get_sync_state_mut().unwrap()) { + let previous_wallet_block = wallet + .get_wallet_block(scan_range.block_range().start - 1) + .ok(); + + scanner + .add_scan_task(ScanTask::from_parts(scan_range, previous_wallet_block)) + .unwrap(); + } else { + // when no more ranges are available to scan, shutdown idle workers + scanner.shutdown_idle_workers(); + } + + Ok(()) +} + /// Update scan ranges to include blocks between the last known chain height (wallet height) and the chain height from the server async fn update_scan_ranges

( fetch_request_sender: mpsc::UnboundedSender, From 08416d92a7836861f617fa15601adce639aa5f8f Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Sat, 16 Nov 2024 06:25:32 +0000 Subject: [PATCH 04/13] refactor_scanner_pt1 --- libtonode-tests/tests/sync.rs | 2 +- zingo-sync/src/scan/task.rs | 211 +++++++++++++++++++--------------- zingo-sync/src/sync.rs | 44 +++---- 3 files changed, 133 insertions(+), 124 deletions(-) diff --git a/libtonode-tests/tests/sync.rs b/libtonode-tests/tests/sync.rs index 18d8f6f7c2..c554194732 100644 --- a/libtonode-tests/tests/sync.rs +++ b/libtonode-tests/tests/sync.rs @@ -31,7 +31,7 @@ async fn sync_mainnet_test() { let mut lightclient = LightClient::create_from_wallet_base_async( WalletBase::from_string(HOSPITAL_MUSEUM_SEED.to_string()), &config, - 2_611_700, + 2_715_150, true, ) .await diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 918f14aae8..d0befb353f 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -10,179 +10,206 @@ use tokio::{sync::mpsc, task::JoinHandle}; use zcash_client_backend::data_api::scanning::ScanRange; use zcash_keys::keys::UnifiedFullViewingKey; -use zcash_primitives::{consensus::Parameters, zip32::AccountId}; +use zcash_primitives::{consensus, zip32::AccountId}; -use crate::{client::FetchRequest, primitives::WalletBlock}; +use crate::{ + client::FetchRequest, + primitives::WalletBlock, + sync, + traits::{SyncBlocks, SyncWallet}, +}; use super::{error::ScanError, scan, ScanResults}; const SCAN_WORKER_POOLSIZE: usize = 2; pub(crate) struct Scanner

{ - workers: Vec, - workers_count: usize, + workers: Vec>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, - parameters: P, + consensus_parameters: P, ufvks: HashMap, } // TODO: add fn for checking and handling worker errors impl

Scanner

where - P: Parameters + Sync + Send + 'static, + P: consensus::Parameters + Sync + Send + 'static, { pub(crate) fn new( scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, - parameters: P, + consensus_parameters: P, ufvks: HashMap, ) -> Self { - let workers: Vec = Vec::with_capacity(SCAN_WORKER_POOLSIZE); + let workers: Vec> = Vec::with_capacity(SCAN_WORKER_POOLSIZE); Self { workers, - workers_count: 0, scan_results_sender, fetch_request_sender, - parameters, + consensus_parameters, ufvks, } } + pub(crate) fn spawn_worker(&mut self) { + let mut worker = ScanWorker::new( + None, + self.scan_results_sender.clone(), + self.fetch_request_sender.clone(), + self.consensus_parameters.clone(), + self.ufvks.clone(), + ); + worker.run().unwrap(); + self.workers.push(worker); + } + pub(crate) fn spawn_workers(&mut self) { for _ in 0..SCAN_WORKER_POOLSIZE { - let (scan_task_sender, scan_task_receiver) = mpsc::unbounded_channel(); - let worker = ScanWorker::new( - scan_task_receiver, - self.scan_results_sender.clone(), - self.fetch_request_sender.clone(), - self.parameters.clone(), - self.ufvks.clone(), - ); - let is_scanning = Arc::clone(&worker.is_scanning); - let handle = tokio::spawn(async move { worker.run().await }); - self.workers.push(WorkerHandle { - _id: self.workers_count, - handle, - is_scanning, - scan_task_sender, - }); - self.workers_count += 1; + self.spawn_worker(); } } - pub(crate) fn is_worker_idle(&self) -> bool { - self.workers.iter().any(|worker| !worker.is_scanning()) - } - - pub(crate) fn shutdown_idle_workers(&self) { - // TODO: use take() with options on senders and handles to shutdown workers gracefully - self.workers - .iter() - .filter(|worker| !worker.is_scanning()) - .for_each(|worker| { - worker.handle.abort(); - }); - } - - pub(crate) fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { - if let Some(worker) = self.workers.iter().find(|worker| !worker.is_scanning()) { - worker.add_scan_task(scan_task).unwrap(); + pub(crate) fn idle_worker(&mut self) -> Option<&mut ScanWorker

> { + if let Some(idle_worker) = self.workers.iter_mut().find(|worker| !worker.is_scanning()) { + Some(idle_worker) } else { - panic!("no idle workers!") + None } - - Ok(()) - } -} - -struct WorkerHandle { - _id: usize, - handle: JoinHandle>, - is_scanning: Arc, - scan_task_sender: mpsc::UnboundedSender, -} - -impl WorkerHandle { - fn is_scanning(&self) -> bool { - self.is_scanning.load(atomic::Ordering::Acquire) } - fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { - self.scan_task_sender.send(scan_task).unwrap(); - - Ok(()) + /// Updates the scanner. + /// + /// If there is an idle worker, create a new scan task and send to worker. + /// If there are no more range available to scan, shutdown the idle worker. + pub(crate) fn update(&mut self, wallet: &mut W) + where + W: SyncWallet + SyncBlocks, + { + if let Some(worker) = self.idle_worker() { + if let Some(scan_task) = ScanTask::create(wallet).unwrap() { + worker.add_scan_task(scan_task).unwrap(); + } else { + if let Some(sender) = worker.scan_task_sender.take() { + drop(sender); + } + } + } } } struct ScanWorker

{ + handle: Option>>, is_scanning: Arc, - scan_task_receiver: mpsc::UnboundedReceiver, + scan_task_sender: Option>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, - parameters: P, + consensus_parameters: P, ufvks: HashMap, } impl

ScanWorker

where - P: Parameters + Sync + Send + 'static, + P: consensus::Parameters + Sync + Send + 'static, { fn new( - scan_task_receiver: mpsc::UnboundedReceiver, + scan_task_sender: Option>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, - parameters: P, + consensus_parameters: P, ufvks: HashMap, ) -> Self { Self { + handle: None, is_scanning: Arc::new(AtomicBool::new(false)), - scan_task_receiver, + scan_task_sender, scan_results_sender, fetch_request_sender, - parameters, + consensus_parameters, ufvks, } } - async fn run(mut self) -> Result<(), ()> { - while let Some(scan_task) = self.scan_task_receiver.recv().await { - self.is_scanning.store(true, atomic::Ordering::Release); + fn run(&mut self) -> Result<(), ()> { + let (scan_task_sender, mut scan_task_receiver) = mpsc::unbounded_channel::(); + let is_scanning = self.is_scanning.clone(); + let scan_results_sender = self.scan_results_sender.clone(); + let fetch_request_sender = self.fetch_request_sender.clone(); + let consensus_parameters = self.consensus_parameters.clone(); + let ufvks = self.ufvks.clone(); + let handle = tokio::spawn(async move { + while let Some(scan_task) = scan_task_receiver.recv().await { + is_scanning.store(true, atomic::Ordering::Release); + + let scan_results = scan( + fetch_request_sender.clone(), + &consensus_parameters, + &ufvks, + scan_task.scan_range.clone(), + scan_task.previous_wallet_block, + ) + .await; + + scan_results_sender + .send((scan_task.scan_range, scan_results)) + .unwrap(); + + is_scanning.store(false, atomic::Ordering::Release); + } + + Ok(()) + }); + + self.handle = Some(handle); + self.scan_task_sender = Some(scan_task_sender); - let scan_results = scan( - self.fetch_request_sender.clone(), - &self.parameters.clone(), - &self.ufvks, - scan_task.scan_range.clone(), - scan_task.previous_wallet_block, - ) - .await; + Ok(()) + } - self.scan_results_sender - .send((scan_task.scan_range, scan_results)) - .unwrap(); + fn is_scanning(&self) -> bool { + self.is_scanning.load(atomic::Ordering::Acquire) + } - self.is_scanning.store(false, atomic::Ordering::Release); - } + fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { + self.scan_task_sender + .clone() + .unwrap() + .send(scan_task) + .unwrap(); Ok(()) } } -pub(crate) struct ScanTask { +struct ScanTask { scan_range: ScanRange, previous_wallet_block: Option, } impl ScanTask { - pub(crate) fn from_parts( - scan_range: ScanRange, - previous_wallet_block: Option, - ) -> Self { + fn from_parts(scan_range: ScanRange, previous_wallet_block: Option) -> Self { Self { scan_range, previous_wallet_block, } } + + fn create(wallet: &mut W) -> Result, ()> + where + W: SyncWallet + SyncBlocks, + { + if let Some(scan_range) = sync::select_scan_range(wallet.get_sync_state_mut().unwrap()) { + let previous_wallet_block = wallet + .get_wallet_block(scan_range.block_range().start - 1) + .ok(); + + Ok(Some(ScanTask::from_parts( + scan_range, + previous_wallet_block, + ))) + } else { + Ok(None) + } + } } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index b72765ff62..591c8d2b90 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -10,7 +10,7 @@ use crate::client::{self, FetchRequest}; use crate::error::SyncError; use crate::primitives::SyncState; use crate::scan::error::{ContinuityError, ScanError}; -use crate::scan::task::{ScanTask, Scanner}; +use crate::scan::task::Scanner; use crate::scan::transactions::scan_transactions; use crate::scan::{DecryptedNoteData, ScanResults}; use crate::traits::{SyncBlocks, SyncNullifiers, SyncShardTrees, SyncTransactions, SyncWallet}; @@ -77,7 +77,6 @@ where interval.tick().await; match scan_results_receiver.try_recv() { - // <<<<<<< HEAD Ok((scan_range, scan_results)) => { process_scan_results( wallet, @@ -89,15 +88,19 @@ where ) .await .unwrap(); - - if scanner.is_worker_idle() { - create_scan_task(wallet, &scanner).unwrap(); - } } Err(TryRecvError::Empty) => { - if scanner.is_worker_idle() { - create_scan_task(wallet, &scanner).unwrap(); - } + // if let Some(worker) = scanner.idle_worker() { + // let scan_task = create_scan_task(wallet).unwrap(); + // if let Some(task) = scan_task { + // worker.add_scan_task(task).unwrap(); + // } else { + // // when no more ranges are available to scan, shutdown idle worker + // if let Some(sender) = worker.scan_task_sender().take() { + // drop(sender); + // } + // } + // } // TODO: if all workers have handle taken, drop scanner. } @@ -111,27 +114,6 @@ where Ok(()) } -fn create_scan_task(wallet: &mut W, scanner: &Scanner

) -> Result<(), ()> -where - P: consensus::Parameters + Sync + Send + 'static, - W: SyncWallet + SyncBlocks, -{ - if let Some(scan_range) = select_scan_range(wallet.get_sync_state_mut().unwrap()) { - let previous_wallet_block = wallet - .get_wallet_block(scan_range.block_range().start - 1) - .ok(); - - scanner - .add_scan_task(ScanTask::from_parts(scan_range, previous_wallet_block)) - .unwrap(); - } else { - // when no more ranges are available to scan, shutdown idle workers - scanner.shutdown_idle_workers(); - } - - Ok(()) -} - /// Update scan ranges to include blocks between the last known chain height (wallet height) and the chain height from the server async fn update_scan_ranges

( fetch_request_sender: mpsc::UnboundedSender, @@ -193,7 +175,7 @@ where /// Selects and prepares the next scan range for scanning. /// Sets the range for scanning to `Ignored` priority in the wallet [sync_state] but returns the scan range with its initial priority. /// Returns `None` if there are no more ranges to scan. -fn select_scan_range(sync_state: &mut SyncState) -> Option { +pub(crate) fn select_scan_range(sync_state: &mut SyncState) -> Option { let scan_ranges = sync_state.scan_ranges_mut(); // TODO: placeholder for algorythm that determines highest priority range to scan From 08101d48b6b2377f4751e2bb55800bf62ec8e0b4 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Sat, 16 Nov 2024 08:43:02 +0000 Subject: [PATCH 05/13] scanner refactor pt2 --- zingo-sync/src/scan/task.rs | 14 ++++++++++---- zingo-sync/src/sync.rs | 12 +----------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index d0befb353f..54562f8fec 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -71,7 +71,7 @@ where } } - pub(crate) fn idle_worker(&mut self) -> Option<&mut ScanWorker

> { + fn idle_worker(&mut self) -> Option<&mut ScanWorker

> { if let Some(idle_worker) = self.workers.iter_mut().find(|worker| !worker.is_scanning()) { Some(idle_worker) } else { @@ -91,9 +91,7 @@ where if let Some(scan_task) = ScanTask::create(wallet).unwrap() { worker.add_scan_task(scan_task).unwrap(); } else { - if let Some(sender) = worker.scan_task_sender.take() { - drop(sender); - } + worker.shutdown(); } } } @@ -133,11 +131,13 @@ where fn run(&mut self) -> Result<(), ()> { let (scan_task_sender, mut scan_task_receiver) = mpsc::unbounded_channel::(); + let is_scanning = self.is_scanning.clone(); let scan_results_sender = self.scan_results_sender.clone(); let fetch_request_sender = self.fetch_request_sender.clone(); let consensus_parameters = self.consensus_parameters.clone(); let ufvks = self.ufvks.clone(); + let handle = tokio::spawn(async move { while let Some(scan_task) = scan_task_receiver.recv().await { is_scanning.store(true, atomic::Ordering::Release); @@ -180,6 +180,12 @@ where Ok(()) } + + fn shutdown(&mut self) { + if let Some(sender) = self.scan_task_sender.take() { + drop(sender); + } + } } struct ScanTask { diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 591c8d2b90..9f1a22c72a 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -90,17 +90,7 @@ where .unwrap(); } Err(TryRecvError::Empty) => { - // if let Some(worker) = scanner.idle_worker() { - // let scan_task = create_scan_task(wallet).unwrap(); - // if let Some(task) = scan_task { - // worker.add_scan_task(task).unwrap(); - // } else { - // // when no more ranges are available to scan, shutdown idle worker - // if let Some(sender) = worker.scan_task_sender().take() { - // drop(sender); - // } - // } - // } + scanner.update(wallet); // TODO: if all workers have handle taken, drop scanner. } From 9611b82fc916c1a4911fa8cabfd8fb6a9f45294f Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 19 Nov 2024 03:19:20 +0000 Subject: [PATCH 06/13] add some reorg todos and update parameters naming --- zingo-sync/src/sync.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 9f1a22c72a..0709c31434 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -107,7 +107,7 @@ where /// Update scan ranges to include blocks between the last known chain height (wallet height) and the chain height from the server async fn update_scan_ranges

( fetch_request_sender: mpsc::UnboundedSender, - parameters: &P, + consensus_parameters: &P, wallet_birthday: BlockHeight, sync_state: &mut SyncState, ) -> Result<(), ()> @@ -121,7 +121,7 @@ where let scan_ranges = sync_state.scan_ranges_mut(); let wallet_height = if scan_ranges.is_empty() { - let sapling_activation_height = parameters + let sapling_activation_height = consensus_parameters .activation_height(NetworkUpgrade::Sapling) .expect("sapling activation height should always return Some"); @@ -138,7 +138,7 @@ where }; if wallet_height > chain_height { - // TODO: Isn't this a possible state if there's been a reorg? + // TODO: truncate wallet to server height in case of reorg panic!("wallet is ahead of server!") } @@ -158,6 +158,7 @@ where // TODO: add logic to combine chain tip scan range with wallet tip scan range // TODO: add scan priority logic // TODO: replace `ignored` (a.k.a scanning) priority with `verify` to prioritise ranges that were being scanned when sync was interrupted + // TODO: split off a verify priority batch from lowest unscanned scan range to prioritise reorg verification on sync start Ok(()) } @@ -199,7 +200,7 @@ pub(crate) fn select_scan_range(sync_state: &mut SyncState) -> Option async fn process_scan_results( wallet: &mut W, fetch_request_sender: mpsc::UnboundedSender, - parameters: &P, + consensus_parameters: &P, ufvks: &HashMap, scan_range: ScanRange, scan_results: Result, @@ -211,7 +212,7 @@ where match scan_results { Ok(results) => { update_wallet_data(wallet, results).unwrap(); - link_nullifiers(wallet, fetch_request_sender, parameters, ufvks) + link_nullifiers(wallet, fetch_request_sender, consensus_parameters, ufvks) .await .unwrap(); remove_irrelevant_data(wallet, &scan_range).unwrap(); @@ -255,6 +256,7 @@ where wallet .remove_nullifiers(scan_range_to_verify.block_range()) .unwrap(); + // TODO: truncate shard tree Ok(()) } From d6c8def21b44cad71fbb73d38e99f7d6972b29da Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 19 Nov 2024 07:14:30 +0000 Subject: [PATCH 07/13] finished worker and sync refactor --- zingo-sync/src/primitives.rs | 9 ++++ zingo-sync/src/scan/task.rs | 82 ++++++++++++++++++++++++++++++------ zingo-sync/src/sync.rs | 38 ++++++++++++----- zingo-sync/src/traits.rs | 6 +-- 4 files changed, 108 insertions(+), 27 deletions(-) diff --git a/zingo-sync/src/primitives.rs b/zingo-sync/src/primitives.rs index aad354242a..4e6c30fe07 100644 --- a/zingo-sync/src/primitives.rs +++ b/zingo-sync/src/primitives.rs @@ -35,6 +35,15 @@ impl SyncState { spend_locations: Vec::new(), } } + + pub fn fully_scanned(&self) -> bool { + self.scan_ranges().iter().all(|scan_range| { + matches!( + scan_range.priority(), + zcash_client_backend::data_api::scanning::ScanPriority::Scanned + ) + }) + } } impl Default for SyncState { diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 54562f8fec..dfe8d879a4 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -6,7 +6,10 @@ use std::{ }, }; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::mpsc, + task::{JoinError, JoinHandle}, +}; use zcash_client_backend::data_api::scanning::ScanRange; use zcash_keys::keys::UnifiedFullViewingKey; @@ -21,10 +24,11 @@ use crate::{ use super::{error::ScanError, scan, ScanResults}; -const SCAN_WORKER_POOLSIZE: usize = 2; +const MAX_WORKER_POOLSIZE: usize = 2; pub(crate) struct Scanner

{ workers: Vec>, + unique_id: usize, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, consensus_parameters: P, @@ -42,10 +46,11 @@ where consensus_parameters: P, ufvks: HashMap, ) -> Self { - let workers: Vec> = Vec::with_capacity(SCAN_WORKER_POOLSIZE); + let workers: Vec> = Vec::with_capacity(MAX_WORKER_POOLSIZE); Self { workers, + unique_id: 0, scan_results_sender, fetch_request_sender, consensus_parameters, @@ -53,8 +58,16 @@ where } } + pub(crate) fn worker_poolsize(&self) -> usize { + self.workers.len() + } + + /// Spawns a worker. + /// + /// When the worker is running it will wait for a scan task. pub(crate) fn spawn_worker(&mut self) { let mut worker = ScanWorker::new( + self.unique_id, None, self.scan_results_sender.clone(), self.fetch_request_sender.clone(), @@ -63,27 +76,49 @@ where ); worker.run().unwrap(); self.workers.push(worker); + self.unique_id += 1; } + /// Spawns the initial pool of workers. + /// + /// Poolsize is set by [`self::MAX_WORKER_POOLSIZE`]. pub(crate) fn spawn_workers(&mut self) { - for _ in 0..SCAN_WORKER_POOLSIZE { + for _ in 0..MAX_WORKER_POOLSIZE { self.spawn_worker(); } } - fn idle_worker(&mut self) -> Option<&mut ScanWorker

> { - if let Some(idle_worker) = self.workers.iter_mut().find(|worker| !worker.is_scanning()) { + fn idle_worker(&self) -> Option<&ScanWorker

> { + if let Some(idle_worker) = self.workers.iter().find(|worker| !worker.is_scanning()) { Some(idle_worker) } else { None } } + async fn shutdown_worker(&mut self, worker_id: usize) -> Result<(), ()> { + if let Some(worker_index) = self + .workers + .iter() + .position(|worker| worker.id == worker_id) + { + let mut worker = self.workers.swap_remove(worker_index); + worker + .shutdown() + .await + .expect("worker should not be able to panic"); + } else { + panic!("id not found in worker pool"); + } + + Ok(()) + } + /// Updates the scanner. /// - /// If there is an idle worker, create a new scan task and send to worker. + /// If there is an idle worker, create a new scan task and add to worker. /// If there are no more range available to scan, shutdown the idle worker. - pub(crate) fn update(&mut self, wallet: &mut W) + pub(crate) async fn update(&mut self, wallet: &mut W) where W: SyncWallet + SyncBlocks, { @@ -91,14 +126,21 @@ where if let Some(scan_task) = ScanTask::create(wallet).unwrap() { worker.add_scan_task(scan_task).unwrap(); } else { - worker.shutdown(); + self.shutdown_worker(worker.id) + .await + .expect("worker should be in worker pool"); } } + + if !wallet.get_sync_state().unwrap().fully_scanned() && self.worker_poolsize() == 0 { + panic!("worker pool should not be empty with unscanned ranges!") + } } } struct ScanWorker

{ - handle: Option>>, + id: usize, + handle: Option>, is_scanning: Arc, scan_task_sender: Option>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, @@ -112,6 +154,7 @@ where P: consensus::Parameters + Sync + Send + 'static, { fn new( + id: usize, scan_task_sender: Option>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, @@ -119,6 +162,7 @@ where ufvks: HashMap, ) -> Self { Self { + id, handle: None, is_scanning: Arc::new(AtomicBool::new(false)), scan_task_sender, @@ -129,6 +173,9 @@ where } } + /// Runs the worker in a new tokio task. + /// + /// Waits for a scan task and then calls [`crate::scan::scan`] on the given range. fn run(&mut self) -> Result<(), ()> { let (scan_task_sender, mut scan_task_receiver) = mpsc::unbounded_channel::(); @@ -153,12 +200,10 @@ where scan_results_sender .send((scan_task.scan_range, scan_results)) - .unwrap(); + .expect("receiver should never be dropped before sender!"); is_scanning.store(false, atomic::Ordering::Release); } - - Ok(()) }); self.handle = Some(handle); @@ -181,10 +226,19 @@ where Ok(()) } - fn shutdown(&mut self) { + /// Shuts down worker by dropping the sender to the worker task and awaiting the handle. + /// + /// This should always be called in the context of the scanner as it must be also be removed from the worker pool. + async fn shutdown(&mut self) -> Result<(), JoinError> { if let Some(sender) = self.scan_task_sender.take() { drop(sender); } + let handle = self + .handle + .take() + .expect("worker should always have a handle to take!"); + + handle.await } } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 0709c31434..651ea7e7bb 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -15,7 +15,6 @@ use crate::scan::transactions::scan_transactions; use crate::scan::{DecryptedNoteData, ScanResults}; use crate::traits::{SyncBlocks, SyncNullifiers, SyncShardTrees, SyncTransactions, SyncWallet}; -use tokio::sync::mpsc::error::TryRecvError; use zcash_client_backend::{ data_api::scanning::{ScanPriority, ScanRange}, proto::service::compact_tx_streamer_client::CompactTxStreamerClient, @@ -73,11 +72,8 @@ where let mut interval = tokio::time::interval(Duration::from_millis(30)); loop { - // TODO: add tokio select to optimise receiver - interval.tick().await; - - match scan_results_receiver.try_recv() { - Ok((scan_range, scan_results)) => { + tokio::select! { + Some((scan_range, scan_results)) = scan_results_receiver.recv() => { process_scan_results( wallet, fetch_request_sender.clone(), @@ -89,12 +85,14 @@ where .await .unwrap(); } - Err(TryRecvError::Empty) => { - scanner.update(wallet); - // TODO: if all workers have handle taken, drop scanner. + _ = interval.tick() => { + scanner.update(wallet).await; + + if sync_complete(&scanner, &scan_results_receiver, wallet) { + break; + } } - Err(TryRecvError::Disconnected) => break, } } @@ -104,6 +102,26 @@ where Ok(()) } +/// Returns true if sync is complete. +/// +/// Sync is complete when: +/// - all scan workers have been shutdown +/// - there is no unprocessed scan results in the channel +/// - all scan ranges have `Scanned` priority +fn sync_complete( + scanner: &Scanner

, + scan_results_receiver: &mpsc::UnboundedReceiver<(ScanRange, Result)>, + wallet: &W, +) -> bool +where + P: consensus::Parameters + Sync + Send + 'static, + W: SyncWallet, +{ + scanner.worker_poolsize() == 0 + && scan_results_receiver.is_empty() + && wallet.get_sync_state().unwrap().fully_scanned() +} + /// Update scan ranges to include blocks between the last known chain height (wallet height) and the chain height from the server async fn update_scan_ranges

( fetch_request_sender: mpsc::UnboundedSender, diff --git a/zingo-sync/src/traits.rs b/zingo-sync/src/traits.rs index 7289a00674..b5ef051bd4 100644 --- a/zingo-sync/src/traits.rs +++ b/zingo-sync/src/traits.rs @@ -53,7 +53,7 @@ pub trait SyncBlocks: SyncWallet { Ok(()) } - /// Removes all wallet blocks with block height's within the given [invalid_range] (end exclusive) + /// Removes all wallet blocks with block height's within the given `invalid_range` (end exclusive) fn remove_wallet_blocks( &mut self, invalid_range: &Range, @@ -86,7 +86,7 @@ pub trait SyncTransactions: SyncWallet { Ok(()) } - /// Removes all wallet transactions with block height's within the given [invalid_range] (end exclusive) + /// Removes all wallet transactions with block height's within the given `invalid_range` (end exclusive) /// Also sets any output's spending_transaction field to `None` if it's spending transaction was removed. fn remove_wallet_transactions( &mut self, @@ -154,7 +154,7 @@ pub trait SyncNullifiers: SyncWallet { Ok(()) } - /// Removes all mapped nullifiers with block height's within the given [invalid_range] (end exclusive) + /// Removes all mapped nullifiers with block height's within the given `invalid_range` (end exclusive) fn remove_nullifiers(&mut self, invalid_range: &Range) -> Result<(), Self::Error> { let nullifier_map = self.get_nullifiers_mut()?; nullifier_map From 9eb0a5a4f3659e6f6469597172d7700f8e3e72e3 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Wed, 20 Nov 2024 03:40:40 +0000 Subject: [PATCH 08/13] drop scanner to fix hang --- libtonode-tests/tests/sync.rs | 8 ++++---- zingo-sync/src/scan/task.rs | 4 ++++ zingo-sync/src/scan/transactions.rs | 9 +++++++++ zingo-sync/src/sync.rs | 4 ++++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/libtonode-tests/tests/sync.rs b/libtonode-tests/tests/sync.rs index c554194732..0a2e57551f 100644 --- a/libtonode-tests/tests/sync.rs +++ b/libtonode-tests/tests/sync.rs @@ -77,8 +77,8 @@ async fn sync_test() { .await .unwrap(); - dbg!(recipient.wallet.wallet_transactions()); - dbg!(recipient.wallet.wallet_blocks()); - dbg!(recipient.wallet.nullifier_map()); - dbg!(recipient.wallet.sync_state()); + // dbg!(recipient.wallet.wallet_transactions()); + // dbg!(recipient.wallet.wallet_blocks()); + // dbg!(recipient.wallet.nullifier_map()); + // dbg!(recipient.wallet.sync_state()); } diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index dfe8d879a4..c860db78dc 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -66,6 +66,7 @@ where /// /// When the worker is running it will wait for a scan task. pub(crate) fn spawn_worker(&mut self) { + tracing::info!("Spawning worker {}", self.unique_id); let mut worker = ScanWorker::new( self.unique_id, None, @@ -217,6 +218,7 @@ where } fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { + tracing::info!("Adding scan task to worker {}:\n{:#?}", self.id, &scan_task); self.scan_task_sender .clone() .unwrap() @@ -230,6 +232,7 @@ where /// /// This should always be called in the context of the scanner as it must be also be removed from the worker pool. async fn shutdown(&mut self) -> Result<(), JoinError> { + tracing::info!("Shutting down worker {}", self.id); if let Some(sender) = self.scan_task_sender.take() { drop(sender); } @@ -242,6 +245,7 @@ where } } +#[derive(Debug)] struct ScanTask { scan_range: ScanRange, previous_wallet_block: Option, diff --git a/zingo-sync/src/scan/transactions.rs b/zingo-sync/src/scan/transactions.rs index 24d962e129..9d613834fe 100644 --- a/zingo-sync/src/scan/transactions.rs +++ b/zingo-sync/src/scan/transactions.rs @@ -226,6 +226,15 @@ fn scan_transaction( add_recipient_unified_address(parameters, uas.clone(), &mut outgoing_sapling_notes); add_recipient_unified_address(parameters, uas, &mut outgoing_orchard_notes); } + ParsedMemo::Version1 { + uas, + rejection_address_indexes: _, + } => { + add_recipient_unified_address(parameters, uas.clone(), &mut outgoing_sapling_notes); + add_recipient_unified_address(parameters, uas, &mut outgoing_orchard_notes); + + // TODO: handle rejection addresses from encoded memos + } _ => panic!( "memo version not supported. please ensure that your software is up-to-date." ), diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 651ea7e7bb..27fa7f1294 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -90,12 +90,14 @@ where scanner.update(wallet).await; if sync_complete(&scanner, &scan_results_receiver, wallet) { + tracing::info!("Sync complete."); break; } } } } + drop(scanner); drop(fetch_request_sender); fetcher_handle.await.unwrap().unwrap(); @@ -241,8 +243,10 @@ where ) .unwrap(); // TODO: also combine adjacent scanned ranges together + tracing::info!("Scan results processed."); } Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => { + tracing::info!("Re-org detected."); if height == scan_range.block_range().start { // error handling in case of re-org where first block prev_hash in scan range does not match previous wallet block hash let sync_state = wallet.get_sync_state_mut().unwrap(); From 2be641723fd479cc54bc0f6f8850f49d9fd20efc Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Wed, 20 Nov 2024 06:07:57 +0000 Subject: [PATCH 09/13] add scanner state machine --- zingo-sync/src/scan/task.rs | 83 +++++++++++++++++++++++++++++++++---- zingo-sync/src/sync.rs | 10 ++++- 2 files changed, 82 insertions(+), 11 deletions(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index c860db78dc..59adc632b3 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -11,7 +11,7 @@ use tokio::{ task::{JoinError, JoinHandle}, }; -use zcash_client_backend::data_api::scanning::ScanRange; +use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange}; use zcash_keys::keys::UnifiedFullViewingKey; use zcash_primitives::{consensus, zip32::AccountId}; @@ -26,7 +26,30 @@ use super::{error::ScanError, scan, ScanResults}; const MAX_WORKER_POOLSIZE: usize = 2; +pub(crate) enum ScannerState { + Verification, + Scan, + Shutdown, +} + +impl ScannerState { + pub(crate) fn verify(&mut self) { + if let ScannerState::Verification = *self { + *self = ScannerState::Scan + } else { + panic!( + "ScanState is not Verification variant. Verification should only complete once!" + ); + } + } + + fn shutdown(&mut self) { + *self = ScannerState::Shutdown + } +} + pub(crate) struct Scanner

{ + state: ScannerState, workers: Vec>, unique_id: usize, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, @@ -49,6 +72,7 @@ where let workers: Vec> = Vec::with_capacity(MAX_WORKER_POOLSIZE); Self { + state: ScannerState::Verification, workers, unique_id: 0, scan_results_sender, @@ -58,6 +82,10 @@ where } } + pub(crate) fn state_mut(&mut self) -> &mut ScannerState { + &mut self.state + } + pub(crate) fn worker_poolsize(&self) -> usize { self.workers.len() } @@ -117,19 +145,56 @@ where /// Updates the scanner. /// + /// If verification is still in progress, do not create scan tasks. /// If there is an idle worker, create a new scan task and add to worker. - /// If there are no more range available to scan, shutdown the idle worker. + /// If there are no more range available to scan, shutdown the idle workers. pub(crate) async fn update(&mut self, wallet: &mut W) where W: SyncWallet + SyncBlocks, { - if let Some(worker) = self.idle_worker() { - if let Some(scan_task) = ScanTask::create(wallet).unwrap() { - worker.add_scan_task(scan_task).unwrap(); - } else { - self.shutdown_worker(worker.id) - .await - .expect("worker should be in worker pool"); + match self.state { + ScannerState::Verification => { + if !wallet + .get_sync_state() + .unwrap() + .scan_ranges() + .iter() + .any(|scan_range| scan_range.priority() == ScanPriority::Verify) + { + // under these conditions the `Verify` scan range is currently being scanned. + // the reason why the logic looks for no `Verify` ranges in the sync state is because it is set to `Ignored` + // during scanning. + // if we were to continue to add new tasks and a re-org had occured the sync state would be unrecoverable. + return; + } + + // scan the range with `Verify` priority + if let Some(worker) = self.idle_worker() { + let scan_task = ScanTask::create(wallet) + .unwrap() + .expect("scan range with `Verify` priority must exist!"); + + assert_eq!(scan_task.scan_range.priority(), ScanPriority::Verify); + worker.add_scan_task(scan_task).unwrap(); + } + } + ScannerState::Scan => { + // create scan tasks until all ranges are scanned or currently scanning + if let Some(worker) = self.idle_worker() { + if let Some(scan_task) = ScanTask::create(wallet).unwrap() { + worker.add_scan_task(scan_task).unwrap(); + } else { + self.state.shutdown(); + } + } + } + ScannerState::Shutdown => { + // shutdown idle workers + while let Some(worker) = self.idle_worker() { + self.shutdown_worker(worker.id) + .await + .expect("worker should be in worker pool"); + } } } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 27fa7f1294..4120b358c5 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -10,7 +10,7 @@ use crate::client::{self, FetchRequest}; use crate::error::SyncError; use crate::primitives::SyncState; use crate::scan::error::{ContinuityError, ScanError}; -use crate::scan::task::Scanner; +use crate::scan::task::{Scanner, ScannerState}; use crate::scan::transactions::scan_transactions; use crate::scan::{DecryptedNoteData, ScanResults}; use crate::traits::{SyncBlocks, SyncNullifiers, SyncShardTrees, SyncTransactions, SyncWallet}; @@ -81,6 +81,7 @@ where &ufvks, scan_range, scan_results, + scanner.state_mut(), ) .await .unwrap(); @@ -184,7 +185,7 @@ where } /// Selects and prepares the next scan range for scanning. -/// Sets the range for scanning to `Ignored` priority in the wallet [sync_state] but returns the scan range with its initial priority. +/// Sets the range for scanning to `Ignored` priority in the wallet `sync_state` but returns the scan range with its initial priority. /// Returns `None` if there are no more ranges to scan. pub(crate) fn select_scan_range(sync_state: &mut SyncState) -> Option { let scan_ranges = sync_state.scan_ranges_mut(); @@ -224,6 +225,7 @@ async fn process_scan_results( ufvks: &HashMap, scan_range: ScanRange, scan_results: Result, + scanner_state: &mut ScannerState, ) -> Result<(), SyncError> where P: consensus::Parameters, @@ -231,6 +233,10 @@ where { match scan_results { Ok(results) => { + if scan_range.priority() == ScanPriority::Verify { + scanner_state.verify(); + } + update_wallet_data(wallet, results).unwrap(); link_nullifiers(wallet, fetch_request_sender, consensus_parameters, ufvks) .await From 82004e75f56e84037d222e23c58ffe45bca78939 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Wed, 20 Nov 2024 08:09:24 +0000 Subject: [PATCH 10/13] added verification and reset to update scan ranges --- zingo-sync/src/sync.rs | 124 +++++++++++++++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 24 deletions(-) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 4120b358c5..1cb6e3c6fc 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -2,7 +2,7 @@ use std::cmp; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::ops::Range; +use std::ops::{Add, Range}; use std::time::Duration; use crate::client::fetch::fetch; @@ -125,13 +125,97 @@ where && wallet.get_sync_state().unwrap().fully_scanned() } -/// Update scan ranges to include blocks between the last known chain height (wallet height) and the chain height from the server +/// Update scan ranges for scanning async fn update_scan_ranges

( fetch_request_sender: mpsc::UnboundedSender, consensus_parameters: &P, wallet_birthday: BlockHeight, sync_state: &mut SyncState, ) -> Result<(), ()> +where + P: consensus::Parameters, +{ + create_scan_range( + fetch_request_sender, + consensus_parameters, + wallet_birthday, + sync_state, + ) + .await?; + + reset_scan_ranges(sync_state)?; + + set_verification_scan_range(sync_state)?; + + // TODO: add logic to combine ranges, especially for chain tip + // TODO: set chain tip range + // TODO: add scan priority logic + + Ok(()) +} + +fn reset_scan_ranges(sync_state: &mut SyncState) -> Result<(), ()> { + let scan_ranges = sync_state.scan_ranges_mut(); + let stale_verify_scan_ranges = scan_ranges + .iter() + .filter(|range| range.priority() == ScanPriority::Verify) + .cloned() + .collect::>(); + let previously_scanning_scan_ranges = scan_ranges + .iter() + .filter(|range| range.priority() == ScanPriority::Ignored) + .cloned() + .collect::>(); + for scan_range in stale_verify_scan_ranges { + set_scan_priority(sync_state, scan_range.block_range(), ScanPriority::Historic).unwrap(); + } + // a range that was previously scanning when sync was last interupted should be set to `ChainTip` which is the + // highest priority that is not `Verify`. + for scan_range in previously_scanning_scan_ranges { + set_scan_priority(sync_state, scan_range.block_range(), ScanPriority::ChainTip).unwrap(); + } + + Ok(()) +} + +fn set_verification_scan_range(sync_state: &mut SyncState) -> Result<(), ()> { + let scan_ranges = sync_state.scan_ranges_mut(); + if let Some((index, lowest_unscanned_range)) = + scan_ranges.iter().enumerate().find(|(_, scan_range)| { + scan_range.priority() != ScanPriority::Ignored + && scan_range.priority() != ScanPriority::Scanned + }) + { + let block_range_to_verify = Range { + start: lowest_unscanned_range.block_range().start, + end: lowest_unscanned_range + .block_range() + .start + .add(VERIFY_BLOCK_RANGE_SIZE), + }; + let split_ranges = split_out_scan_range( + lowest_unscanned_range, + block_range_to_verify, + ScanPriority::Verify, + ); + + assert!(split_ranges.len() == 2); + + sync_state + .scan_ranges_mut() + .splice(index..=index, split_ranges); + } + + Ok(()) +} + +/// Create scan range between the last known chain height (wallet height) and the chain height from the server +async fn create_scan_range

( + fetch_request_sender: mpsc::UnboundedSender, + consensus_parameters: &P, + wallet_birthday: BlockHeight, + sync_state: &mut SyncState, +) -> Result<(), ()> where P: consensus::Parameters, { @@ -163,24 +247,19 @@ where panic!("wallet is ahead of server!") } - let chain_tip_scan_range = ScanRange::from_parts( + let new_scan_range = ScanRange::from_parts( Range { start: wallet_height, end: chain_height + 1, }, ScanPriority::Historic, ); - scan_ranges.push(chain_tip_scan_range); + scan_ranges.push(new_scan_range); if scan_ranges.is_empty() { - panic!("scan ranges should never be empty after updating") + panic!("scan ranges should never be empty after updating"); } - // TODO: add logic to combine chain tip scan range with wallet tip scan range - // TODO: add scan priority logic - // TODO: replace `ignored` (a.k.a scanning) priority with `verify` to prioritise ranges that were being scanned when sync was interrupted - // TODO: split off a verify priority batch from lowest unscanned scan range to prioritise reorg verification on sync start - Ok(()) } @@ -258,8 +337,7 @@ where let sync_state = wallet.get_sync_state_mut().unwrap(); set_scan_priority(sync_state, scan_range.block_range(), scan_range.priority()) .unwrap(); // reset scan range to initial priority in wallet sync state - let scan_range_to_verify = - verify_scan_range_tip(sync_state, height.saturating_sub(1)); + let scan_range_to_verify = verify_scan_range_tip(sync_state, height - 1); invalidate_scan_range(wallet, scan_range_to_verify).unwrap(); } else { scan_results?; @@ -275,6 +353,7 @@ fn invalidate_scan_range(wallet: &mut W, scan_range_to_verify: ScanRange) -> where W: SyncBlocks + SyncTransactions + SyncNullifiers, { + // TODO: wallet should truncate not remove wallet .remove_wallet_blocks(scan_range_to_verify.block_range()) .unwrap(); @@ -428,10 +507,7 @@ fn verify_scan_range_tip(sync_state: &mut SyncState, block_height: BlockHeight) } let block_range_to_verify = Range { - start: scan_range - .block_range() - .end - .saturating_sub(VERIFY_BLOCK_RANGE_SIZE), + start: scan_range.block_range().end - VERIFY_BLOCK_RANGE_SIZE, end: scan_range.block_range().end, }; let split_ranges = @@ -485,13 +561,13 @@ fn determine_block_range(block_height: BlockHeight) -> Range { Range { start, end } } -/// Takes a scan range and splits it at [block_range.start] and [block_range.end], returning a vec of scan ranges where -/// the scan range with the specified [block_range] has the given [scan_priority]. +/// Takes a scan range and splits it at `block_range.start` and `block_range.end`, returning a vec of scan ranges where +/// the scan range with the specified `block_range` has the given `scan_priority`. /// -/// If [block_range] goes beyond the bounds of [scan_range.block_range()] no splitting will occur at the upper and/or +/// If `block_range` goes beyond the bounds of `scan_range.block_range()` no splitting will occur at the upper and/or /// lower bound but the priority will still be updated /// -/// Panics if no blocks in [block_range] are contained within [scan_range.block_range()] +/// Panics if no blocks in `block_range` are contained within `scan_range.block_range()` fn split_out_scan_range( scan_range: &ScanRange, block_range: Range, @@ -561,8 +637,8 @@ where .map(|tx| tx.block_height()) .collect::>(); wallet.get_wallet_blocks_mut().unwrap().retain(|height, _| { - *height >= scan_range.block_range().end.saturating_sub(1) - || *height >= wallet_height.saturating_sub(100) + *height >= scan_range.block_range().end - 1 + || *height >= wallet_height - 100 || wallet_transaction_heights.contains(height) }); wallet @@ -579,9 +655,9 @@ where Ok(()) } -/// Sets the scan range in [sync_state] with [block_range] to the given [scan_priority]. +/// Sets the scan range in `sync_state` with `block_range` to the given `scan_priority`. /// -/// Panics if no scan range is found in [sync_state] with a block range of exactly [block_range]. +/// Panics if no scan range is found in `sync_state` with a block range of exactly `block_range`. fn set_scan_priority( sync_state: &mut SyncState, block_range: &Range, From af0ee2531fd4cbc59ab7abd5470817f8914e532f Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Thu, 21 Nov 2024 03:51:20 +0000 Subject: [PATCH 11/13] implemented scan priority logic --- zingo-sync/src/sync.rs | 192 +++++++++++++++++++++++------------------ 1 file changed, 106 insertions(+), 86 deletions(-) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 1cb6e3c6fc..54c4d358ba 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -26,6 +26,8 @@ use tokio::sync::mpsc; use zcash_primitives::transaction::TxId; use zcash_primitives::zip32::AccountId; +// TODO: create sub modules for sync module to organise code + // TODO; replace fixed batches with orchard shard ranges (block ranges containing all note commitments to an orchard shard or fragment of a shard) const BATCH_SIZE: u32 = 1_000; const VERIFY_BLOCK_RANGE_SIZE: u32 = 10; @@ -135,21 +137,72 @@ async fn update_scan_ranges

( where P: consensus::Parameters, { + let chain_height = client::get_chain_height(fetch_request_sender) + .await + .unwrap(); create_scan_range( - fetch_request_sender, + chain_height, consensus_parameters, wallet_birthday, sync_state, ) .await?; - reset_scan_ranges(sync_state)?; - set_verification_scan_range(sync_state)?; - // TODO: add logic to combine ranges, especially for chain tip + // TODO: add logic to merge scan ranges // TODO: set chain tip range - // TODO: add scan priority logic + // TODO: set open adjacent range + + Ok(()) +} + +/// Create scan range between the last known chain height (wallet height) and the chain height from the server +async fn create_scan_range

( + chain_height: BlockHeight, + consensus_parameters: &P, + wallet_birthday: BlockHeight, + sync_state: &mut SyncState, +) -> Result<(), ()> +where + P: consensus::Parameters, +{ + let scan_ranges = sync_state.scan_ranges_mut(); + + let wallet_height = if scan_ranges.is_empty() { + let sapling_activation_height = consensus_parameters + .activation_height(NetworkUpgrade::Sapling) + .expect("sapling activation height should always return Some"); + + match wallet_birthday.cmp(&sapling_activation_height) { + cmp::Ordering::Greater | cmp::Ordering::Equal => wallet_birthday, + cmp::Ordering::Less => sapling_activation_height, + } + } else { + scan_ranges + .last() + .expect("Vec should not be empty") + .block_range() + .end + }; + + if wallet_height > chain_height { + // TODO: truncate wallet to server height in case of reorg + panic!("wallet is ahead of server!") + } + + let new_scan_range = ScanRange::from_parts( + Range { + start: wallet_height, + end: chain_height + 1, + }, + ScanPriority::Historic, + ); + scan_ranges.push(new_scan_range); + + if scan_ranges.is_empty() { + panic!("scan ranges should never be empty after updating"); + } Ok(()) } @@ -167,7 +220,12 @@ fn reset_scan_ranges(sync_state: &mut SyncState) -> Result<(), ()> { .cloned() .collect::>(); for scan_range in stale_verify_scan_ranges { - set_scan_priority(sync_state, scan_range.block_range(), ScanPriority::Historic).unwrap(); + set_scan_priority( + sync_state, + scan_range.block_range(), + ScanPriority::OpenAdjacent, + ) + .unwrap(); } // a range that was previously scanning when sync was last interupted should be set to `ChainTip` which is the // highest priority that is not `Verify`. @@ -199,8 +257,6 @@ fn set_verification_scan_range(sync_state: &mut SyncState) -> Result<(), ()> { ScanPriority::Verify, ); - assert!(split_ranges.len() == 2); - sync_state .scan_ranges_mut() .splice(index..=index, split_ranges); @@ -209,91 +265,56 @@ fn set_verification_scan_range(sync_state: &mut SyncState) -> Result<(), ()> { Ok(()) } -/// Create scan range between the last known chain height (wallet height) and the chain height from the server -async fn create_scan_range

( - fetch_request_sender: mpsc::UnboundedSender, - consensus_parameters: &P, - wallet_birthday: BlockHeight, - sync_state: &mut SyncState, -) -> Result<(), ()> -where - P: consensus::Parameters, -{ - let chain_height = client::get_chain_height(fetch_request_sender) - .await - .unwrap(); - - let scan_ranges = sync_state.scan_ranges_mut(); - - let wallet_height = if scan_ranges.is_empty() { - let sapling_activation_height = consensus_parameters - .activation_height(NetworkUpgrade::Sapling) - .expect("sapling activation height should always return Some"); - - match wallet_birthday.cmp(&sapling_activation_height) { - cmp::Ordering::Greater | cmp::Ordering::Equal => wallet_birthday, - cmp::Ordering::Less => sapling_activation_height, - } - } else { - scan_ranges - .last() - .expect("Vec should not be empty") - .block_range() - .end - }; - - if wallet_height > chain_height { - // TODO: truncate wallet to server height in case of reorg - panic!("wallet is ahead of server!") - } - - let new_scan_range = ScanRange::from_parts( - Range { - start: wallet_height, - end: chain_height + 1, - }, - ScanPriority::Historic, - ); - scan_ranges.push(new_scan_range); - - if scan_ranges.is_empty() { - panic!("scan ranges should never be empty after updating"); - } - - Ok(()) -} - /// Selects and prepares the next scan range for scanning. /// Sets the range for scanning to `Ignored` priority in the wallet `sync_state` but returns the scan range with its initial priority. /// Returns `None` if there are no more ranges to scan. pub(crate) fn select_scan_range(sync_state: &mut SyncState) -> Option { let scan_ranges = sync_state.scan_ranges_mut(); - // TODO: placeholder for algorythm that determines highest priority range to scan - let (index, selected_scan_range) = scan_ranges.iter_mut().enumerate().find(|(_, range)| { - range.priority() != ScanPriority::Scanned && range.priority() != ScanPriority::Ignored - })?; - - // TODO: replace with new range split/splice helpers - // if scan range is larger than BATCH_SIZE, split off and return a batch from the lower end and update scan ranges - if let Some((lower_range, higher_range)) = - selected_scan_range.split_at(selected_scan_range.block_range().start + BATCH_SIZE) + let mut scan_ranges_priority_sorted: Vec<&ScanRange> = scan_ranges.iter().collect(); + scan_ranges_priority_sorted.sort_by(|a, b| b.block_range().start.cmp(&a.block_range().start)); + scan_ranges_priority_sorted.sort_by_key(|scan_range| scan_range.priority()); + let highest_priority_scan_range = scan_ranges_priority_sorted + .pop() + .expect("scan ranges should be non-empty after setup") + .clone(); + if highest_priority_scan_range.priority() == ScanPriority::Scanned + || highest_priority_scan_range.priority() == ScanPriority::Ignored { - let lower_range_ignored = - ScanRange::from_parts(lower_range.block_range().clone(), ScanPriority::Ignored); - scan_ranges.splice(index..=index, vec![lower_range_ignored, higher_range]); + return None; + } - Some(lower_range) - } else { - let selected_scan_range = selected_scan_range.clone(); - let selected_range_ignored = ScanRange::from_parts( - selected_scan_range.block_range().clone(), - ScanPriority::Ignored, - ); - scan_ranges.splice(index..=index, vec![selected_range_ignored]); + let (index, selected_scan_range) = scan_ranges + .iter_mut() + .enumerate() + .find(|(_, scan_range)| { + scan_range.block_range() == highest_priority_scan_range.block_range() + }) + .expect("scan range should exist"); - Some(selected_scan_range.clone()) - } + let batch_block_range = Range { + start: selected_scan_range.block_range().start, + end: selected_scan_range.block_range().start + BATCH_SIZE, + }; + let split_ranges = split_out_scan_range( + selected_scan_range, + batch_block_range, + ScanPriority::Ignored, + ); + + let trimmed_block_range = split_ranges + .first() + .expect("vec should always be non-empty") + .block_range() + .clone(); + + scan_ranges.splice(index..=index, split_ranges); + + // TODO: when this library has its own version of ScanRange this can be simpified and more readable + Some(ScanRange::from_parts( + trimmed_block_range, + highest_priority_scan_range.priority(), + )) } /// Scan post-processing @@ -513,10 +534,9 @@ fn verify_scan_range_tip(sync_state: &mut SyncState, block_height: BlockHeight) let split_ranges = split_out_scan_range(scan_range, block_range_to_verify, ScanPriority::Verify); - assert!(split_ranges.len() == 2); let scan_range_to_verify = split_ranges .last() - .expect("split_ranges should always have exactly 2 elements") + .expect("vec should always be non-empty") .clone(); sync_state From 11e21079d493c0650014e0dc1d1ade4c8a9962eb Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Thu, 21 Nov 2024 06:12:24 +0000 Subject: [PATCH 12/13] added shard tree truncation and reworked truncation methods --- zingo-sync/src/sync.rs | 24 ++++++++---------- zingo-sync/src/traits.rs | 53 +++++++++++++++++++++++++++------------- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 54c4d358ba..0187666ca4 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -359,7 +359,7 @@ where set_scan_priority(sync_state, scan_range.block_range(), scan_range.priority()) .unwrap(); // reset scan range to initial priority in wallet sync state let scan_range_to_verify = verify_scan_range_tip(sync_state, height - 1); - invalidate_scan_range(wallet, scan_range_to_verify).unwrap(); + truncate_wallet_data(wallet, scan_range_to_verify.block_range().start - 1).unwrap(); } else { scan_results?; } @@ -370,21 +370,17 @@ where Ok(()) } -fn invalidate_scan_range(wallet: &mut W, scan_range_to_verify: ScanRange) -> Result<(), ()> +/// Removes all wallet data above the given `truncate_height`. +fn truncate_wallet_data(wallet: &mut W, truncate_height: BlockHeight) -> Result<(), ()> where - W: SyncBlocks + SyncTransactions + SyncNullifiers, + W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees, { - // TODO: wallet should truncate not remove - wallet - .remove_wallet_blocks(scan_range_to_verify.block_range()) - .unwrap(); - wallet - .remove_wallet_transactions(scan_range_to_verify.block_range()) - .unwrap(); + wallet.truncate_wallet_blocks(truncate_height).unwrap(); wallet - .remove_nullifiers(scan_range_to_verify.block_range()) + .truncate_wallet_transactions(truncate_height) .unwrap(); - // TODO: truncate shard tree + wallet.truncate_nullifiers(truncate_height).unwrap(); + wallet.truncate_shard_trees(truncate_height).unwrap(); Ok(()) } @@ -510,8 +506,8 @@ where Ok(()) } -/// Splits out the highest VERIFY_BLOCK_RANGE_SIZE blocks from the scan range containing the given block height -/// and sets it's priority to `verify`. +/// Splits out the highest VERIFY_BLOCK_RANGE_SIZE blocks from the scan range containing the given `block height` +/// and sets it's priority to `Verify`. /// Returns a clone of the scan range to be verified. /// /// Panics if the scan range containing the given block height is not of priority `Scanned` diff --git a/zingo-sync/src/traits.rs b/zingo-sync/src/traits.rs index 2152a97c51..61e5e3569d 100644 --- a/zingo-sync/src/traits.rs +++ b/zingo-sync/src/traits.rs @@ -2,7 +2,6 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; -use std::ops::Range; use zcash_client_backend::keys::UnifiedFullViewingKey; use zcash_primitives::consensus::BlockHeight; @@ -53,13 +52,10 @@ pub trait SyncBlocks: SyncWallet { Ok(()) } - /// Removes all wallet blocks with block height's within the given `invalid_range` (end exclusive) - fn remove_wallet_blocks( - &mut self, - invalid_range: &Range, - ) -> Result<(), Self::Error> { + /// Removes all wallet blocks above the given `block_height`. + fn truncate_wallet_blocks(&mut self, truncate_height: BlockHeight) -> Result<(), Self::Error> { self.get_wallet_blocks_mut()? - .retain(|_, block| !invalid_range.contains(&block.block_height())); + .retain(|block_height, _| block_height.clone() <= truncate_height); Ok(()) } @@ -86,17 +82,17 @@ pub trait SyncTransactions: SyncWallet { Ok(()) } - /// Removes all wallet transactions with block height's within the given `invalid_range` (end exclusive) + /// Removes all wallet transactions above the given `block_height`. /// Also sets any output's spending_transaction field to `None` if it's spending transaction was removed. - fn remove_wallet_transactions( + fn truncate_wallet_transactions( &mut self, - invalid_range: &Range, + truncate_height: BlockHeight, ) -> Result<(), Self::Error> { - // Replace with `extract_if()` when it's in stable rust + // TODO: Replace with `extract_if()` when it's in stable rust let invalid_txids: Vec = self .get_wallet_transactions()? .values() - .filter(|tx| invalid_range.contains(&tx.block_height())) + .filter(|tx| tx.block_height() > truncate_height) .map(|tx| tx.transaction().txid()) .collect(); @@ -154,15 +150,15 @@ pub trait SyncNullifiers: SyncWallet { Ok(()) } - /// Removes all mapped nullifiers with block height's within the given `invalid_range` (end exclusive) - fn remove_nullifiers(&mut self, invalid_range: &Range) -> Result<(), Self::Error> { + /// Removes all mapped nullifiers above the given `block_height`. + fn truncate_nullifiers(&mut self, truncate_height: BlockHeight) -> Result<(), Self::Error> { let nullifier_map = self.get_nullifiers_mut()?; nullifier_map .sapling_mut() - .retain(|_, (block_height, _)| !invalid_range.contains(block_height)); + .retain(|_, (block_height, _)| block_height.clone() <= truncate_height); nullifier_map .orchard_mut() - .retain(|_, (block_height, _)| !invalid_range.contains(block_height)); + .retain(|_, (block_height, _)| block_height.clone() <= truncate_height); Ok(()) } @@ -200,5 +196,28 @@ pub trait SyncShardTrees: SyncWallet { Ok(()) } - // TODO: check if shard tree needs to be invalidated due to re-org or leaves can be inserted to replace invalid parts of commitment tree + /// Removes all shard tree data above the given `block_height`. + fn truncate_shard_trees(&mut self, truncate_height: BlockHeight) -> Result<(), Self::Error> { + // TODO: investigate resetting the shard completely when truncate height is 0 + if self + .get_shard_trees_mut()? + .sapling_mut() + .truncate_to_checkpoint(&truncate_height) + .unwrap() + == false + { + panic!("max checkpoints should always be higher than verification window!"); + } + if self + .get_shard_trees_mut()? + .orchard_mut() + .truncate_to_checkpoint(&truncate_height) + .unwrap() + == false + { + panic!("max checkpoints should always be higher than verification window!"); + } + + Ok(()) + } } From 5338143fb2dcf263847120922c6e8c01fb75caa3 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Fri, 22 Nov 2024 01:41:00 +0000 Subject: [PATCH 13/13] fix clippy warnings --- zingo-sync/src/traits.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/zingo-sync/src/traits.rs b/zingo-sync/src/traits.rs index 61e5e3569d..2db810e707 100644 --- a/zingo-sync/src/traits.rs +++ b/zingo-sync/src/traits.rs @@ -55,7 +55,7 @@ pub trait SyncBlocks: SyncWallet { /// Removes all wallet blocks above the given `block_height`. fn truncate_wallet_blocks(&mut self, truncate_height: BlockHeight) -> Result<(), Self::Error> { self.get_wallet_blocks_mut()? - .retain(|block_height, _| block_height.clone() <= truncate_height); + .retain(|block_height, _| *block_height <= truncate_height); Ok(()) } @@ -155,10 +155,10 @@ pub trait SyncNullifiers: SyncWallet { let nullifier_map = self.get_nullifiers_mut()?; nullifier_map .sapling_mut() - .retain(|_, (block_height, _)| block_height.clone() <= truncate_height); + .retain(|_, (block_height, _)| *block_height <= truncate_height); nullifier_map .orchard_mut() - .retain(|_, (block_height, _)| block_height.clone() <= truncate_height); + .retain(|_, (block_height, _)| *block_height <= truncate_height); Ok(()) } @@ -199,21 +199,19 @@ pub trait SyncShardTrees: SyncWallet { /// Removes all shard tree data above the given `block_height`. fn truncate_shard_trees(&mut self, truncate_height: BlockHeight) -> Result<(), Self::Error> { // TODO: investigate resetting the shard completely when truncate height is 0 - if self + if !self .get_shard_trees_mut()? .sapling_mut() .truncate_to_checkpoint(&truncate_height) .unwrap() - == false { panic!("max checkpoints should always be higher than verification window!"); } - if self + if !self .get_shard_trees_mut()? .orchard_mut() .truncate_to_checkpoint(&truncate_height) .unwrap() - == false { panic!("max checkpoints should always be higher than verification window!"); }