diff --git a/Cargo.lock b/Cargo.lock index 68b7f41b2..52f3d07ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4531,6 +4531,7 @@ dependencies = [ "sapling-crypto", "sha2", "shardtree", + "thiserror", "tokio", "tonic", "tracing", diff --git a/libtonode-tests/tests/sync.rs b/libtonode-tests/tests/sync.rs index 18d8f6f7c..0a2e57551 100644 --- a/libtonode-tests/tests/sync.rs +++ b/libtonode-tests/tests/sync.rs @@ -31,7 +31,7 @@ async fn sync_mainnet_test() { let mut lightclient = LightClient::create_from_wallet_base_async( WalletBase::from_string(HOSPITAL_MUSEUM_SEED.to_string()), &config, - 2_611_700, + 2_715_150, true, ) .await @@ -77,8 +77,8 @@ async fn sync_test() { .await .unwrap(); - dbg!(recipient.wallet.wallet_transactions()); - dbg!(recipient.wallet.wallet_blocks()); - dbg!(recipient.wallet.nullifier_map()); - dbg!(recipient.wallet.sync_state()); + // dbg!(recipient.wallet.wallet_transactions()); + // dbg!(recipient.wallet.wallet_blocks()); + // dbg!(recipient.wallet.nullifier_map()); + // dbg!(recipient.wallet.sync_state()); } diff --git a/zingo-sync/Cargo.toml b/zingo-sync/Cargo.toml index 043e30612..3df25e5ec 100644 --- a/zingo-sync/Cargo.toml +++ b/zingo-sync/Cargo.toml @@ -38,6 +38,9 @@ rayon.workspace = true # Minimise boilerplate getset.workspace = true +# Error handling +thiserror.workspace = true + # temp zcash_address.workspace = true sha2.workspace = true diff --git a/zingo-sync/src/error.rs b/zingo-sync/src/error.rs new file mode 100644 index 000000000..a5eab3ff1 --- /dev/null +++ b/zingo-sync/src/error.rs @@ -0,0 +1,11 @@ +//! Top level error module for the crate + +use crate::scan::error::ScanError; + +/// Top level error enum encapsulating any error that may occur during sync +#[derive(Debug, thiserror::Error)] +pub enum SyncError { + /// Errors associated with scanning + #[error("Scan error. {0}")] + ScanError(#[from] ScanError), +} diff --git a/zingo-sync/src/lib.rs b/zingo-sync/src/lib.rs index 6fd5fe12b..d49635aae 100644 --- a/zingo-sync/src/lib.rs +++ b/zingo-sync/src/lib.rs @@ -4,6 +4,7 @@ //! Entrypoint: [`crate::sync::sync`] pub mod client; +pub mod error; pub(crate) mod keys; #[allow(missing_docs)] pub mod primitives; diff --git a/zingo-sync/src/primitives.rs b/zingo-sync/src/primitives.rs index aad354242..4e6c30fe0 100644 --- a/zingo-sync/src/primitives.rs +++ b/zingo-sync/src/primitives.rs @@ -35,6 +35,15 @@ impl SyncState { spend_locations: Vec::new(), } } + + pub fn fully_scanned(&self) -> bool { + self.scan_ranges().iter().all(|scan_range| { + matches!( + scan_range.priority(), + zcash_client_backend::data_api::scanning::ScanPriority::Scanned + ) + }) + } } impl Default for SyncState { diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index b88112e30..81e610010 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -20,9 +20,12 @@ use crate::{ witness::ShardTreeData, }; -use self::{compact_blocks::scan_compact_blocks, transactions::scan_transactions}; +use self::{ + compact_blocks::scan_compact_blocks, error::ScanError, transactions::scan_transactions, +}; mod compact_blocks; +pub mod error; pub(crate) mod task; pub(crate) mod transactions; @@ -136,15 +139,15 @@ impl DecryptedNoteData { } } -// scans a given range and returns all data relevant to the specified keys -// `previous_wallet_block` is the wallet block with height [scan_range.start - 1] +/// Scans a given range and returns all data relevant to the specified keys. +/// `previous_wallet_block` is the wallet block with height [scan_range.start - 1]. pub(crate) async fn scan

( fetch_request_sender: mpsc::UnboundedSender, parameters: &P, ufvks: &HashMap, scan_range: ScanRange, previous_wallet_block: Option, -) -> Result +) -> Result where P: Parameters + Sync + Send + 'static, { @@ -166,8 +169,7 @@ where .await .unwrap(); - let scan_data = - scan_compact_blocks(compact_blocks, parameters, ufvks, initial_scan_data).unwrap(); + let scan_data = scan_compact_blocks(compact_blocks, parameters, ufvks, initial_scan_data)?; let ScanData { nullifiers, diff --git a/zingo-sync/src/scan/compact_blocks.rs b/zingo-sync/src/scan/compact_blocks.rs index 14cc39c6a..2a1daf980 100644 --- a/zingo-sync/src/scan/compact_blocks.rs +++ b/zingo-sync/src/scan/compact_blocks.rs @@ -23,7 +23,10 @@ use crate::{ use self::runners::{BatchRunners, DecryptedOutput}; -use super::{DecryptedNoteData, InitialScanData, ScanData}; +use super::{ + error::{ContinuityError, ScanError}, + DecryptedNoteData, InitialScanData, ScanData, +}; mod runners; @@ -32,11 +35,11 @@ pub(crate) fn scan_compact_blocks

( parameters: &P, ufvks: &HashMap, initial_scan_data: InitialScanData, -) -> Result +) -> Result where P: Parameters + Sync + Send + 'static, { - check_continuity(&compact_blocks, initial_scan_data.previous_block.as_ref()).unwrap(); + check_continuity(&compact_blocks, initial_scan_data.previous_block.as_ref())?; let scanning_keys = ScanningKeys::from_account_ufvks(ufvks.clone()); let mut runners = trial_decrypt(parameters, &scanning_keys, &compact_blocks).unwrap(); @@ -162,7 +165,7 @@ where fn check_continuity( compact_blocks: &[CompactBlock], previous_compact_block: Option<&WalletBlock>, -) -> Result<(), ()> { +) -> Result<(), ContinuityError> { let mut prev_height: Option = None; let mut prev_hash: Option = None; @@ -174,13 +177,20 @@ fn check_continuity( for block in compact_blocks { if let Some(prev_height) = prev_height { if block.height() != prev_height + 1 { - panic!("height discontinuity"); + return Err(ContinuityError::HeightDiscontinuity { + height: block.height(), + previous_block_height: prev_height, + }); } } if let Some(prev_hash) = prev_hash { if block.prev_hash() != prev_hash { - panic!("hash discontinuity"); + return Err(ContinuityError::HashDiscontinuity { + height: block.height(), + prev_hash: block.prev_hash(), + previous_block_hash: prev_hash, + }); } } diff --git a/zingo-sync/src/scan/error.rs b/zingo-sync/src/scan/error.rs new file mode 100644 index 000000000..8c35dce3d --- /dev/null +++ b/zingo-sync/src/scan/error.rs @@ -0,0 +1,22 @@ +use zcash_primitives::{block::BlockHash, consensus::BlockHeight}; + +#[derive(Debug, thiserror::Error)] +pub enum ScanError { + #[error("Continuity error. {0}")] + ContinuityError(#[from] ContinuityError), +} + +#[derive(Debug, thiserror::Error)] +pub enum ContinuityError { + #[error("Height discontinuity. Block with height {height} is not continuous with previous block height {previous_block_height}")] + HeightDiscontinuity { + height: BlockHeight, + previous_block_height: BlockHeight, + }, + #[error("Hash discontinuity. Block prev_hash {prev_hash} with height {height} does not match previous block hash {previous_block_hash}")] + HashDiscontinuity { + height: BlockHeight, + prev_hash: BlockHash, + previous_block_hash: BlockHash, + }, +} diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 9ec52d890..59adc632b 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -6,169 +6,339 @@ use std::{ }, }; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::mpsc, + task::{JoinError, JoinHandle}, +}; -use zcash_client_backend::data_api::scanning::ScanRange; +use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange}; use zcash_keys::keys::UnifiedFullViewingKey; -use zcash_primitives::{consensus::Parameters, zip32::AccountId}; +use zcash_primitives::{consensus, zip32::AccountId}; + +use crate::{ + client::FetchRequest, + primitives::WalletBlock, + sync, + traits::{SyncBlocks, SyncWallet}, +}; + +use super::{error::ScanError, scan, ScanResults}; -use crate::{client::FetchRequest, primitives::WalletBlock}; +const MAX_WORKER_POOLSIZE: usize = 2; -use super::{scan, ScanResults}; +pub(crate) enum ScannerState { + Verification, + Scan, + Shutdown, +} + +impl ScannerState { + pub(crate) fn verify(&mut self) { + if let ScannerState::Verification = *self { + *self = ScannerState::Scan + } else { + panic!( + "ScanState is not Verification variant. Verification should only complete once!" + ); + } + } -const SCAN_WORKER_POOLSIZE: usize = 2; + fn shutdown(&mut self) { + *self = ScannerState::Shutdown + } +} pub(crate) struct Scanner

{ - workers: Vec, - scan_results_sender: mpsc::UnboundedSender<(ScanRange, ScanResults)>, + state: ScannerState, + workers: Vec>, + unique_id: usize, + scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, - parameters: P, + consensus_parameters: P, ufvks: HashMap, } // TODO: add fn for checking and handling worker errors impl

Scanner

where - P: Parameters + Sync + Send + 'static, + P: consensus::Parameters + Sync + Send + 'static, { pub(crate) fn new( - scan_results_sender: mpsc::UnboundedSender<(ScanRange, ScanResults)>, + scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, - parameters: P, + consensus_parameters: P, ufvks: HashMap, ) -> Self { - let workers: Vec = Vec::with_capacity(SCAN_WORKER_POOLSIZE); + let workers: Vec> = Vec::with_capacity(MAX_WORKER_POOLSIZE); Self { + state: ScannerState::Verification, workers, + unique_id: 0, scan_results_sender, fetch_request_sender, - parameters, + consensus_parameters, ufvks, } } + pub(crate) fn state_mut(&mut self) -> &mut ScannerState { + &mut self.state + } + + pub(crate) fn worker_poolsize(&self) -> usize { + self.workers.len() + } + + /// Spawns a worker. + /// + /// When the worker is running it will wait for a scan task. + pub(crate) fn spawn_worker(&mut self) { + tracing::info!("Spawning worker {}", self.unique_id); + let mut worker = ScanWorker::new( + self.unique_id, + None, + self.scan_results_sender.clone(), + self.fetch_request_sender.clone(), + self.consensus_parameters.clone(), + self.ufvks.clone(), + ); + worker.run().unwrap(); + self.workers.push(worker); + self.unique_id += 1; + } + + /// Spawns the initial pool of workers. + /// + /// Poolsize is set by [`self::MAX_WORKER_POOLSIZE`]. pub(crate) fn spawn_workers(&mut self) { - for _ in 0..SCAN_WORKER_POOLSIZE { - let (scan_task_sender, scan_task_receiver) = mpsc::unbounded_channel(); - let worker = ScanWorker::new( - scan_task_receiver, - self.scan_results_sender.clone(), - self.fetch_request_sender.clone(), - self.parameters.clone(), - self.ufvks.clone(), - ); - let is_scanning = Arc::clone(&worker.is_scanning); - let handle = tokio::spawn(async move { worker.run().await }); - self.workers.push(WorkerHandle { - _handle: handle, - is_scanning, - scan_task_sender, - }); + for _ in 0..MAX_WORKER_POOLSIZE { + self.spawn_worker(); } } - pub(crate) fn is_worker_idle(&self) -> bool { - self.workers.iter().any(|worker| !worker.is_scanning()) + fn idle_worker(&self) -> Option<&ScanWorker

> { + if let Some(idle_worker) = self.workers.iter().find(|worker| !worker.is_scanning()) { + Some(idle_worker) + } else { + None + } } - pub(crate) fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { - if let Some(worker) = self.workers.iter().find(|worker| !worker.is_scanning()) { - worker.add_scan_task(scan_task).unwrap(); + async fn shutdown_worker(&mut self, worker_id: usize) -> Result<(), ()> { + if let Some(worker_index) = self + .workers + .iter() + .position(|worker| worker.id == worker_id) + { + let mut worker = self.workers.swap_remove(worker_index); + worker + .shutdown() + .await + .expect("worker should not be able to panic"); } else { - panic!("no idle workers!") + panic!("id not found in worker pool"); } Ok(()) } -} -struct WorkerHandle { - _handle: JoinHandle>, - is_scanning: Arc, - scan_task_sender: mpsc::UnboundedSender, -} + /// Updates the scanner. + /// + /// If verification is still in progress, do not create scan tasks. + /// If there is an idle worker, create a new scan task and add to worker. + /// If there are no more range available to scan, shutdown the idle workers. + pub(crate) async fn update(&mut self, wallet: &mut W) + where + W: SyncWallet + SyncBlocks, + { + match self.state { + ScannerState::Verification => { + if !wallet + .get_sync_state() + .unwrap() + .scan_ranges() + .iter() + .any(|scan_range| scan_range.priority() == ScanPriority::Verify) + { + // under these conditions the `Verify` scan range is currently being scanned. + // the reason why the logic looks for no `Verify` ranges in the sync state is because it is set to `Ignored` + // during scanning. + // if we were to continue to add new tasks and a re-org had occured the sync state would be unrecoverable. + return; + } -impl WorkerHandle { - fn is_scanning(&self) -> bool { - self.is_scanning.load(atomic::Ordering::Acquire) - } + // scan the range with `Verify` priority + if let Some(worker) = self.idle_worker() { + let scan_task = ScanTask::create(wallet) + .unwrap() + .expect("scan range with `Verify` priority must exist!"); - fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { - self.scan_task_sender.send(scan_task).unwrap(); + assert_eq!(scan_task.scan_range.priority(), ScanPriority::Verify); + worker.add_scan_task(scan_task).unwrap(); + } + } + ScannerState::Scan => { + // create scan tasks until all ranges are scanned or currently scanning + if let Some(worker) = self.idle_worker() { + if let Some(scan_task) = ScanTask::create(wallet).unwrap() { + worker.add_scan_task(scan_task).unwrap(); + } else { + self.state.shutdown(); + } + } + } + ScannerState::Shutdown => { + // shutdown idle workers + while let Some(worker) = self.idle_worker() { + self.shutdown_worker(worker.id) + .await + .expect("worker should be in worker pool"); + } + } + } - Ok(()) + if !wallet.get_sync_state().unwrap().fully_scanned() && self.worker_poolsize() == 0 { + panic!("worker pool should not be empty with unscanned ranges!") + } } } struct ScanWorker

{ + id: usize, + handle: Option>, is_scanning: Arc, - scan_task_receiver: mpsc::UnboundedReceiver, - scan_results_sender: mpsc::UnboundedSender<(ScanRange, ScanResults)>, + scan_task_sender: Option>, + scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, - parameters: P, + consensus_parameters: P, ufvks: HashMap, } impl

ScanWorker

where - P: Parameters + Sync + Send + 'static, + P: consensus::Parameters + Sync + Send + 'static, { fn new( - scan_task_receiver: mpsc::UnboundedReceiver, - scan_results_sender: mpsc::UnboundedSender<(ScanRange, ScanResults)>, + id: usize, + scan_task_sender: Option>, + scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, - parameters: P, + consensus_parameters: P, ufvks: HashMap, ) -> Self { Self { + id, + handle: None, is_scanning: Arc::new(AtomicBool::new(false)), - scan_task_receiver, + scan_task_sender, scan_results_sender, fetch_request_sender, - parameters, + consensus_parameters, ufvks, } } - async fn run(mut self) -> Result<(), ()> { - while let Some(scan_task) = self.scan_task_receiver.recv().await { - self.is_scanning.store(true, atomic::Ordering::Release); + /// Runs the worker in a new tokio task. + /// + /// Waits for a scan task and then calls [`crate::scan::scan`] on the given range. + fn run(&mut self) -> Result<(), ()> { + let (scan_task_sender, mut scan_task_receiver) = mpsc::unbounded_channel::(); + + let is_scanning = self.is_scanning.clone(); + let scan_results_sender = self.scan_results_sender.clone(); + let fetch_request_sender = self.fetch_request_sender.clone(); + let consensus_parameters = self.consensus_parameters.clone(); + let ufvks = self.ufvks.clone(); + + let handle = tokio::spawn(async move { + while let Some(scan_task) = scan_task_receiver.recv().await { + is_scanning.store(true, atomic::Ordering::Release); + + let scan_results = scan( + fetch_request_sender.clone(), + &consensus_parameters, + &ufvks, + scan_task.scan_range.clone(), + scan_task.previous_wallet_block, + ) + .await; - let scan_results = scan( - self.fetch_request_sender.clone(), - &self.parameters.clone(), - &self.ufvks, - scan_task.scan_range.clone(), - scan_task.previous_wallet_block, - ) - .await + scan_results_sender + .send((scan_task.scan_range, scan_results)) + .expect("receiver should never be dropped before sender!"); + + is_scanning.store(false, atomic::Ordering::Release); + } + }); + + self.handle = Some(handle); + self.scan_task_sender = Some(scan_task_sender); + + Ok(()) + } + + fn is_scanning(&self) -> bool { + self.is_scanning.load(atomic::Ordering::Acquire) + } + + fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { + tracing::info!("Adding scan task to worker {}:\n{:#?}", self.id, &scan_task); + self.scan_task_sender + .clone() + .unwrap() + .send(scan_task) .unwrap(); - self.scan_results_sender - .send((scan_task.scan_range, scan_results)) - .unwrap(); + Ok(()) + } - self.is_scanning.store(false, atomic::Ordering::Release); + /// Shuts down worker by dropping the sender to the worker task and awaiting the handle. + /// + /// This should always be called in the context of the scanner as it must be also be removed from the worker pool. + async fn shutdown(&mut self) -> Result<(), JoinError> { + tracing::info!("Shutting down worker {}", self.id); + if let Some(sender) = self.scan_task_sender.take() { + drop(sender); } + let handle = self + .handle + .take() + .expect("worker should always have a handle to take!"); - Ok(()) + handle.await } } -pub(crate) struct ScanTask { +#[derive(Debug)] +struct ScanTask { scan_range: ScanRange, previous_wallet_block: Option, } impl ScanTask { - pub(crate) fn from_parts( - scan_range: ScanRange, - previous_wallet_block: Option, - ) -> Self { + fn from_parts(scan_range: ScanRange, previous_wallet_block: Option) -> Self { Self { scan_range, previous_wallet_block, } } + + fn create(wallet: &mut W) -> Result, ()> + where + W: SyncWallet + SyncBlocks, + { + if let Some(scan_range) = sync::select_scan_range(wallet.get_sync_state_mut().unwrap()) { + let previous_wallet_block = wallet + .get_wallet_block(scan_range.block_range().start - 1) + .ok(); + + Ok(Some(ScanTask::from_parts( + scan_range, + previous_wallet_block, + ))) + } else { + Ok(None) + } + } } diff --git a/zingo-sync/src/scan/transactions.rs b/zingo-sync/src/scan/transactions.rs index 24d962e12..9d613834f 100644 --- a/zingo-sync/src/scan/transactions.rs +++ b/zingo-sync/src/scan/transactions.rs @@ -226,6 +226,15 @@ fn scan_transaction( add_recipient_unified_address(parameters, uas.clone(), &mut outgoing_sapling_notes); add_recipient_unified_address(parameters, uas, &mut outgoing_orchard_notes); } + ParsedMemo::Version1 { + uas, + rejection_address_indexes: _, + } => { + add_recipient_unified_address(parameters, uas.clone(), &mut outgoing_sapling_notes); + add_recipient_unified_address(parameters, uas, &mut outgoing_orchard_notes); + + // TODO: handle rejection addresses from encoded memos + } _ => panic!( "memo version not supported. please ensure that your software is up-to-date." ), diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 06774606c..0187666ca 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -2,18 +2,19 @@ use std::cmp; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::ops::Range; +use std::ops::{Add, Range}; use std::time::Duration; use crate::client::fetch::fetch; use crate::client::{self, FetchRequest}; +use crate::error::SyncError; use crate::primitives::SyncState; -use crate::scan::task::{ScanTask, Scanner}; +use crate::scan::error::{ContinuityError, ScanError}; +use crate::scan::task::{Scanner, ScannerState}; use crate::scan::transactions::scan_transactions; use crate::scan::{DecryptedNoteData, ScanResults}; use crate::traits::{SyncBlocks, SyncNullifiers, SyncShardTrees, SyncTransactions, SyncWallet}; -use tokio::sync::mpsc::error::TryRecvError; use zcash_client_backend::{ data_api::scanning::{ScanPriority, ScanRange}, proto::service::compact_tx_streamer_client::CompactTxStreamerClient, @@ -25,16 +26,18 @@ use tokio::sync::mpsc; use zcash_primitives::transaction::TxId; use zcash_primitives::zip32::AccountId; +// TODO: create sub modules for sync module to organise code + // TODO; replace fixed batches with orchard shard ranges (block ranges containing all note commitments to an orchard shard or fragment of a shard) -const BATCH_SIZE: u32 = 10; -// const BATCH_SIZE: u32 = 1_000; +const BATCH_SIZE: u32 = 1_000; +const VERIFY_BLOCK_RANGE_SIZE: u32 = 10; /// Syncs a wallet to the latest state of the blockchain pub async fn sync( client: CompactTxStreamerClient, // TODO: change underlying service for generic consensus_parameters: &P, wallet: &mut W, -) -> Result<(), ()> +) -> Result<(), SyncError> where P: consensus::Parameters + Sync + Send + 'static, W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees, @@ -71,67 +74,63 @@ where let mut interval = tokio::time::interval(Duration::from_millis(30)); loop { - interval.tick().await; // TODO: tokio select to receive scan results before tick - - // if a scan worker is idle, send it a new scan task - if scanner.is_worker_idle() { - if let Some(scan_range) = prepare_next_scan_range(wallet.get_sync_state_mut().unwrap()) - { - // TODO: Can previous_wallet_block have the same value - // for more than one call to get_wallet_block? - let previous_wallet_block = wallet - .get_wallet_block(scan_range.block_range().start - 1) - .ok(); - - scanner - .add_scan_task(ScanTask::from_parts(scan_range, previous_wallet_block)) - .unwrap(); - } else { - // when no more ranges are available to scan, break out of the loop - break; + tokio::select! { + Some((scan_range, scan_results)) = scan_results_receiver.recv() => { + process_scan_results( + wallet, + fetch_request_sender.clone(), + consensus_parameters, + &ufvks, + scan_range, + scan_results, + scanner.state_mut(), + ) + .await + .unwrap(); } - } - match scan_results_receiver.try_recv() { - Ok((scan_range, scan_results)) => process_scan_results( - wallet, - fetch_request_sender.clone(), - consensus_parameters, - &ufvks, - scan_range, - scan_results, - ) - .await - .unwrap(), - Err(TryRecvError::Empty) => (), - Err(TryRecvError::Disconnected) => break, + _ = interval.tick() => { + scanner.update(wallet).await; + + if sync_complete(&scanner, &scan_results_receiver, wallet) { + tracing::info!("Sync complete."); + break; + } + } } } drop(scanner); - while let Some((scan_range, scan_results)) = scan_results_receiver.recv().await { - process_scan_results( - wallet, - fetch_request_sender.clone(), - consensus_parameters, - &ufvks, - scan_range, - scan_results, - ) - .await - .unwrap(); - } - drop(fetch_request_sender); fetcher_handle.await.unwrap().unwrap(); Ok(()) } -/// Update scan ranges to include blocks between the last known chain height (wallet height) and the chain height from the server +/// Returns true if sync is complete. +/// +/// Sync is complete when: +/// - all scan workers have been shutdown +/// - there is no unprocessed scan results in the channel +/// - all scan ranges have `Scanned` priority +fn sync_complete( + scanner: &Scanner

, + scan_results_receiver: &mpsc::UnboundedReceiver<(ScanRange, Result)>, + wallet: &W, +) -> bool +where + P: consensus::Parameters + Sync + Send + 'static, + W: SyncWallet, +{ + scanner.worker_poolsize() == 0 + && scan_results_receiver.is_empty() + && wallet.get_sync_state().unwrap().fully_scanned() +} + +/// Update scan ranges for scanning async fn update_scan_ranges

( fetch_request_sender: mpsc::UnboundedSender, - parameters: &P, + consensus_parameters: &P, wallet_birthday: BlockHeight, sync_state: &mut SyncState, ) -> Result<(), ()> @@ -141,11 +140,37 @@ where let chain_height = client::get_chain_height(fetch_request_sender) .await .unwrap(); + create_scan_range( + chain_height, + consensus_parameters, + wallet_birthday, + sync_state, + ) + .await?; + reset_scan_ranges(sync_state)?; + set_verification_scan_range(sync_state)?; + + // TODO: add logic to merge scan ranges + // TODO: set chain tip range + // TODO: set open adjacent range + + Ok(()) +} +/// Create scan range between the last known chain height (wallet height) and the chain height from the server +async fn create_scan_range

( + chain_height: BlockHeight, + consensus_parameters: &P, + wallet_birthday: BlockHeight, + sync_state: &mut SyncState, +) -> Result<(), ()> +where + P: consensus::Parameters, +{ let scan_ranges = sync_state.scan_ranges_mut(); let wallet_height = if scan_ranges.is_empty() { - let sapling_activation_height = parameters + let sapling_activation_height = consensus_parameters .activation_height(NetworkUpgrade::Sapling) .expect("sapling activation height should always return Some"); @@ -162,80 +187,200 @@ where }; if wallet_height > chain_height { - // TODO: Isn't this a possible state if there's been a reorg? + // TODO: truncate wallet to server height in case of reorg panic!("wallet is ahead of server!") } - let chain_tip_scan_range = ScanRange::from_parts( + let new_scan_range = ScanRange::from_parts( Range { start: wallet_height, end: chain_height + 1, }, ScanPriority::Historic, ); - scan_ranges.push(chain_tip_scan_range); + scan_ranges.push(new_scan_range); if scan_ranges.is_empty() { - panic!("scan ranges should never be empty after updating") + panic!("scan ranges should never be empty after updating"); } - // TODO: add logic to combine chain tip scan range with wallet tip scan range - // TODO: add scan priority logic - // TODO: replace `ignored` (a.k.a scanning) priority with `verify` to prioritise ranges that were being scanned when sync was interrupted - Ok(()) } -/// Prepares the next scan range for scanning. -/// Returns `None` if there are no more ranges to scan -fn prepare_next_scan_range(sync_state: &mut SyncState) -> Option { +fn reset_scan_ranges(sync_state: &mut SyncState) -> Result<(), ()> { let scan_ranges = sync_state.scan_ranges_mut(); + let stale_verify_scan_ranges = scan_ranges + .iter() + .filter(|range| range.priority() == ScanPriority::Verify) + .cloned() + .collect::>(); + let previously_scanning_scan_ranges = scan_ranges + .iter() + .filter(|range| range.priority() == ScanPriority::Ignored) + .cloned() + .collect::>(); + for scan_range in stale_verify_scan_ranges { + set_scan_priority( + sync_state, + scan_range.block_range(), + ScanPriority::OpenAdjacent, + ) + .unwrap(); + } + // a range that was previously scanning when sync was last interupted should be set to `ChainTip` which is the + // highest priority that is not `Verify`. + for scan_range in previously_scanning_scan_ranges { + set_scan_priority(sync_state, scan_range.block_range(), ScanPriority::ChainTip).unwrap(); + } - // placeholder for algorythm that determines highest priority range to scan - let (index, selected_scan_range) = scan_ranges.iter_mut().enumerate().find(|(_, range)| { - range.priority() != ScanPriority::Scanned && range.priority() != ScanPriority::Ignored - })?; + Ok(()) +} - // if scan range is larger than BATCH_SIZE, split off and return a batch from the lower end and update scan ranges - if let Some((lower_range, higher_range)) = - selected_scan_range.split_at(selected_scan_range.block_range().start + BATCH_SIZE) +fn set_verification_scan_range(sync_state: &mut SyncState) -> Result<(), ()> { + let scan_ranges = sync_state.scan_ranges_mut(); + if let Some((index, lowest_unscanned_range)) = + scan_ranges.iter().enumerate().find(|(_, scan_range)| { + scan_range.priority() != ScanPriority::Ignored + && scan_range.priority() != ScanPriority::Scanned + }) { - let lower_range_ignored = - ScanRange::from_parts(lower_range.block_range().clone(), ScanPriority::Ignored); - scan_ranges.splice(index..=index, vec![lower_range_ignored, higher_range]); - - Some(lower_range) - } else { - let selected_scan_range = selected_scan_range.clone(); - let selected_range_ignored = ScanRange::from_parts( - selected_scan_range.block_range().clone(), - ScanPriority::Ignored, + let block_range_to_verify = Range { + start: lowest_unscanned_range.block_range().start, + end: lowest_unscanned_range + .block_range() + .start + .add(VERIFY_BLOCK_RANGE_SIZE), + }; + let split_ranges = split_out_scan_range( + lowest_unscanned_range, + block_range_to_verify, + ScanPriority::Verify, ); - scan_ranges.splice(index..=index, vec![selected_range_ignored]); - Some(selected_scan_range.clone()) + sync_state + .scan_ranges_mut() + .splice(index..=index, split_ranges); } + + Ok(()) +} + +/// Selects and prepares the next scan range for scanning. +/// Sets the range for scanning to `Ignored` priority in the wallet `sync_state` but returns the scan range with its initial priority. +/// Returns `None` if there are no more ranges to scan. +pub(crate) fn select_scan_range(sync_state: &mut SyncState) -> Option { + let scan_ranges = sync_state.scan_ranges_mut(); + + let mut scan_ranges_priority_sorted: Vec<&ScanRange> = scan_ranges.iter().collect(); + scan_ranges_priority_sorted.sort_by(|a, b| b.block_range().start.cmp(&a.block_range().start)); + scan_ranges_priority_sorted.sort_by_key(|scan_range| scan_range.priority()); + let highest_priority_scan_range = scan_ranges_priority_sorted + .pop() + .expect("scan ranges should be non-empty after setup") + .clone(); + if highest_priority_scan_range.priority() == ScanPriority::Scanned + || highest_priority_scan_range.priority() == ScanPriority::Ignored + { + return None; + } + + let (index, selected_scan_range) = scan_ranges + .iter_mut() + .enumerate() + .find(|(_, scan_range)| { + scan_range.block_range() == highest_priority_scan_range.block_range() + }) + .expect("scan range should exist"); + + let batch_block_range = Range { + start: selected_scan_range.block_range().start, + end: selected_scan_range.block_range().start + BATCH_SIZE, + }; + let split_ranges = split_out_scan_range( + selected_scan_range, + batch_block_range, + ScanPriority::Ignored, + ); + + let trimmed_block_range = split_ranges + .first() + .expect("vec should always be non-empty") + .block_range() + .clone(); + + scan_ranges.splice(index..=index, split_ranges); + + // TODO: when this library has its own version of ScanRange this can be simpified and more readable + Some(ScanRange::from_parts( + trimmed_block_range, + highest_priority_scan_range.priority(), + )) } /// Scan post-processing async fn process_scan_results( wallet: &mut W, fetch_request_sender: mpsc::UnboundedSender, - parameters: &P, + consensus_parameters: &P, ufvks: &HashMap, scan_range: ScanRange, - scan_results: ScanResults, -) -> Result<(), ()> + scan_results: Result, + scanner_state: &mut ScannerState, +) -> Result<(), SyncError> where P: consensus::Parameters, W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees, { - update_wallet_data(wallet, scan_results).unwrap(); - link_nullifiers(wallet, fetch_request_sender, parameters, ufvks) - .await + match scan_results { + Ok(results) => { + if scan_range.priority() == ScanPriority::Verify { + scanner_state.verify(); + } + + update_wallet_data(wallet, results).unwrap(); + link_nullifiers(wallet, fetch_request_sender, consensus_parameters, ufvks) + .await + .unwrap(); + remove_irrelevant_data(wallet, &scan_range).unwrap(); + set_scan_priority( + wallet.get_sync_state_mut().unwrap(), + scan_range.block_range(), + ScanPriority::Scanned, + ) + .unwrap(); + // TODO: also combine adjacent scanned ranges together + tracing::info!("Scan results processed."); + } + Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => { + tracing::info!("Re-org detected."); + if height == scan_range.block_range().start { + // error handling in case of re-org where first block prev_hash in scan range does not match previous wallet block hash + let sync_state = wallet.get_sync_state_mut().unwrap(); + set_scan_priority(sync_state, scan_range.block_range(), scan_range.priority()) + .unwrap(); // reset scan range to initial priority in wallet sync state + let scan_range_to_verify = verify_scan_range_tip(sync_state, height - 1); + truncate_wallet_data(wallet, scan_range_to_verify.block_range().start - 1).unwrap(); + } else { + scan_results?; + } + } + Err(e) => return Err(e.into()), + } + + Ok(()) +} + +/// Removes all wallet data above the given `truncate_height`. +fn truncate_wallet_data(wallet: &mut W, truncate_height: BlockHeight) -> Result<(), ()> +where + W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees, +{ + wallet.truncate_wallet_blocks(truncate_height).unwrap(); + wallet + .truncate_wallet_transactions(truncate_height) .unwrap(); - remove_irrelevant_data(wallet, &scan_range).unwrap(); - mark_scanned(scan_range, wallet.get_sync_state_mut().unwrap()).unwrap(); + wallet.truncate_nullifiers(truncate_height).unwrap(); + wallet.truncate_shard_trees(truncate_height).unwrap(); Ok(()) } @@ -361,6 +506,42 @@ where Ok(()) } +/// Splits out the highest VERIFY_BLOCK_RANGE_SIZE blocks from the scan range containing the given `block height` +/// and sets it's priority to `Verify`. +/// Returns a clone of the scan range to be verified. +/// +/// Panics if the scan range containing the given block height is not of priority `Scanned` +fn verify_scan_range_tip(sync_state: &mut SyncState, block_height: BlockHeight) -> ScanRange { + let (index, scan_range) = sync_state + .scan_ranges() + .iter() + .enumerate() + .find(|(_, range)| range.block_range().contains(&block_height)) + .expect("scan range containing given block height should always exist!"); + + if scan_range.priority() != ScanPriority::Scanned { + panic!("scan range should always have scan priority `Scanned`!") + } + + let block_range_to_verify = Range { + start: scan_range.block_range().end - VERIFY_BLOCK_RANGE_SIZE, + end: scan_range.block_range().end, + }; + let split_ranges = + split_out_scan_range(scan_range, block_range_to_verify, ScanPriority::Verify); + + let scan_range_to_verify = split_ranges + .last() + .expect("vec should always be non-empty") + .clone(); + + sync_state + .scan_ranges_mut() + .splice(index..=index, split_ranges); + + scan_range_to_verify +} + /// Splits out a scan range surrounding a given block height with the specified priority #[allow(dead_code)] fn update_scan_priority( @@ -396,13 +577,13 @@ fn determine_block_range(block_height: BlockHeight) -> Range { Range { start, end } } -/// Takes a scan range and splits it at [block_range.start] and [block_range.end], returning a vec of scan ranges where -/// the scan range with the specified [block_range] has the given [scan_priority]. +/// Takes a scan range and splits it at `block_range.start` and `block_range.end`, returning a vec of scan ranges where +/// the scan range with the specified `block_range` has the given `scan_priority`. /// -/// If [block_range] goes beyond the bounds of [scan_range.block_range()] no splitting will occur at the upper and/or +/// If `block_range` goes beyond the bounds of `scan_range.block_range()` no splitting will occur at the upper and/or /// lower bound but the priority will still be updated /// -/// Panics if no blocks in [block_range] are contained within [scan_range.block_range()] +/// Panics if no blocks in `block_range` are contained within `scan_range.block_range()` fn split_out_scan_range( scan_range: &ScanRange, block_range: Range, @@ -447,6 +628,7 @@ fn split_out_scan_range( split_ranges } +// TODO: replace this function with a filter on the data added to wallet fn remove_irrelevant_data(wallet: &mut W, scan_range: &ScanRange) -> Result<(), ()> where W: SyncWallet + SyncBlocks + SyncNullifiers + SyncTransactions, @@ -471,8 +653,8 @@ where .map(|tx| tx.block_height()) .collect::>(); wallet.get_wallet_blocks_mut().unwrap().retain(|height, _| { - *height >= scan_range.block_range().end - || *height >= wallet_height.saturating_sub(100) + *height >= scan_range.block_range().end - 1 + || *height >= wallet_height - 100 || wallet_transaction_heights.contains(height) }); wallet @@ -489,20 +671,25 @@ where Ok(()) } -fn mark_scanned(scan_range: ScanRange, sync_state: &mut SyncState) -> Result<(), ()> { +/// Sets the scan range in `sync_state` with `block_range` to the given `scan_priority`. +/// +/// Panics if no scan range is found in `sync_state` with a block range of exactly `block_range`. +fn set_scan_priority( + sync_state: &mut SyncState, + block_range: &Range, + scan_priority: ScanPriority, +) -> Result<(), ()> { let scan_ranges = sync_state.scan_ranges_mut(); if let Some((index, range)) = scan_ranges .iter() .enumerate() - .find(|(_, range)| range.block_range() == scan_range.block_range()) + .find(|(_, range)| range.block_range() == block_range) { - scan_ranges[index] = - ScanRange::from_parts(range.block_range().clone(), ScanPriority::Scanned); + scan_ranges[index] = ScanRange::from_parts(range.block_range().clone(), scan_priority); } else { - panic!("scanned range not found!") + panic!("scan range with block range {:?} not found!", block_range) } - // TODO: also combine adjacent scanned ranges together Ok(()) } diff --git a/zingo-sync/src/traits.rs b/zingo-sync/src/traits.rs index 8d8c9d7c2..2db810e70 100644 --- a/zingo-sync/src/traits.rs +++ b/zingo-sync/src/traits.rs @@ -33,8 +33,6 @@ pub trait SyncWallet { /// Trait for interfacing [`crate::primitives::WalletBlock`]s with wallet data pub trait SyncBlocks: SyncWallet { - // TODO: add method to get wallet data for writing defualt implementations on other methods - /// Get a stored wallet compact block from wallet data by block height /// Must return error if block is not found fn get_wallet_block(&self, block_height: BlockHeight) -> Result; @@ -53,6 +51,14 @@ pub trait SyncBlocks: SyncWallet { Ok(()) } + + /// Removes all wallet blocks above the given `block_height`. + fn truncate_wallet_blocks(&mut self, truncate_height: BlockHeight) -> Result<(), Self::Error> { + self.get_wallet_blocks_mut()? + .retain(|block_height, _| *block_height <= truncate_height); + + Ok(()) + } } /// Trait for interfacing [`crate::primitives::WalletTransaction`]s with wallet data @@ -75,12 +81,57 @@ pub trait SyncTransactions: SyncWallet { Ok(()) } + + /// Removes all wallet transactions above the given `block_height`. + /// Also sets any output's spending_transaction field to `None` if it's spending transaction was removed. + fn truncate_wallet_transactions( + &mut self, + truncate_height: BlockHeight, + ) -> Result<(), Self::Error> { + // TODO: Replace with `extract_if()` when it's in stable rust + let invalid_txids: Vec = self + .get_wallet_transactions()? + .values() + .filter(|tx| tx.block_height() > truncate_height) + .map(|tx| tx.transaction().txid()) + .collect(); + + let wallet_transactions = self.get_wallet_transactions_mut()?; + wallet_transactions + .values_mut() + .flat_map(|tx| tx.sapling_notes_mut()) + .filter(|note| { + note.spending_transaction().map_or_else( + || false, + |spending_txid| invalid_txids.contains(&spending_txid), + ) + }) + .for_each(|note| { + note.set_spending_transaction(None); + }); + wallet_transactions + .values_mut() + .flat_map(|tx| tx.orchard_notes_mut()) + .filter(|note| { + note.spending_transaction().map_or_else( + || false, + |spending_txid| invalid_txids.contains(&spending_txid), + ) + }) + .for_each(|note| { + note.set_spending_transaction(None); + }); + + invalid_txids.iter().for_each(|invalid_txid| { + wallet_transactions.remove(invalid_txid); + }); + + Ok(()) + } } /// Trait for interfacing nullifiers with wallet data pub trait SyncNullifiers: SyncWallet { - // TODO: add method to get wallet data for writing defualt implementations on other methods - // /// Get wallet nullifier map // fn get_nullifiers(&self) -> Result<&NullifierMap, Self::Error>; @@ -98,6 +149,19 @@ pub trait SyncNullifiers: SyncWallet { Ok(()) } + + /// Removes all mapped nullifiers above the given `block_height`. + fn truncate_nullifiers(&mut self, truncate_height: BlockHeight) -> Result<(), Self::Error> { + let nullifier_map = self.get_nullifiers_mut()?; + nullifier_map + .sapling_mut() + .retain(|_, (block_height, _)| *block_height <= truncate_height); + nullifier_map + .orchard_mut() + .retain(|_, (block_height, _)| *block_height <= truncate_height); + + Ok(()) + } } /// Trait for interfacing shard tree data with wallet data @@ -131,4 +195,27 @@ pub trait SyncShardTrees: SyncWallet { Ok(()) } + + /// Removes all shard tree data above the given `block_height`. + fn truncate_shard_trees(&mut self, truncate_height: BlockHeight) -> Result<(), Self::Error> { + // TODO: investigate resetting the shard completely when truncate height is 0 + if !self + .get_shard_trees_mut()? + .sapling_mut() + .truncate_to_checkpoint(&truncate_height) + .unwrap() + { + panic!("max checkpoints should always be higher than verification window!"); + } + if !self + .get_shard_trees_mut()? + .orchard_mut() + .truncate_to_checkpoint(&truncate_height) + .unwrap() + { + panic!("max checkpoints should always be higher than verification window!"); + } + + Ok(()) + } }