Skip to content

Commit

Permalink
feat(scanner): Add a new zebra-scanner binary (#8608)
Browse files Browse the repository at this point in the history
* add a new `zebra-scanner` binary

* update arguments

* allow birthday in config

* remove required feature

* add `env-filter` feature to `tracing-subscriber` dependency

* use sync task

* codespell

---------

Co-authored-by: Arya <[email protected]>
  • Loading branch information
oxarbitrage and arya2 authored Jul 9, 2024
1 parent 4213e82 commit a94b2be
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 23 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6046,15 +6046,19 @@ dependencies = [
"insta",
"itertools 0.13.0",
"jubjub",
"lazy_static",
"proptest",
"proptest-derive",
"rand 0.8.5",
"sapling-crypto",
"semver 1.0.23",
"serde",
"serde_json",
"structopt",
"tokio",
"tower",
"tracing",
"tracing-subscriber",
"zcash_address",
"zcash_client_backend",
"zcash_keys",
Expand All @@ -6063,6 +6067,7 @@ dependencies = [
"zebra-chain",
"zebra-grpc",
"zebra-node-services",
"zebra-rpc",
"zebra-state",
"zebra-test",
]
Expand Down
14 changes: 14 additions & 0 deletions zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ name = "scanner-grpc-server"
path = "src/bin/rpc_server.rs"
required-features = ["proptest-impl"]

[[bin]] # Bin to run the Scanner tool
name = "zebra-scanner"
path = "src/bin/scanner/main.rs"

[features]

# Production features that activate extra dependencies, or extra features in dependencies
Expand All @@ -39,6 +43,9 @@ proptest-impl = [
"zcash_note_encryption",
]

# Needed for the zebra-scanner binary.
shielded-scan = []

[dependencies]

color-eyre = "0.6.3"
Expand All @@ -61,6 +68,7 @@ zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = ["shielded-scan"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["shielded-scan"] }
zebra-grpc = { path = "../zebra-grpc", version = "0.1.0-alpha.5" }
zebra-rpc = { path = "../zebra-rpc", version = "1.0.0-beta.38" }

chrono = { version = "0.4.38", default-features = false, features = ["clock", "std", "serde"] }

Expand All @@ -77,6 +85,12 @@ zcash_note_encryption = { version = "0.4.0", optional = true }

zebra-test = { path = "../zebra-test", version = "1.0.0-beta.38", optional = true }

# zebra-scanner binary dependencies
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
structopt = "0.3.26"
lazy_static = "1.4.0"
serde_json = "1.0.117"

[dev-dependencies]
insta = { version = "1.39.0", features = ["ron", "redactions"] }
tokio = { version = "1.37.0", features = ["test-util"] }
Expand Down
144 changes: 144 additions & 0 deletions zebra-scan/src/bin/scanner/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//! The zebra-scanner binary.
//!
//! The zebra-scanner binary is a standalone binary that scans the Zcash blockchain for transactions using the given sapling keys.
use color_eyre::eyre::eyre;
use lazy_static::lazy_static;
use structopt::StructOpt;
use tracing::*;

use zebra_chain::{block::Height, parameters::Network};
use zebra_state::SaplingScanningKey;

use core::net::SocketAddr;
use std::path::PathBuf;

/// A structure with sapling key and birthday height.
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize)]
pub struct SaplingKey {
key: SaplingScanningKey,
#[serde(default = "min_height")]
birthday_height: Height,
}

fn min_height() -> Height {
Height(0)
}

impl std::str::FromStr for SaplingKey {
type Err = Box<dyn std::error::Error>;
fn from_str(value: &str) -> Result<Self, Self::Err> {
Ok(serde_json::from_str(value)?)
}
}

#[tokio::main]
/// Runs the zebra scanner binary with the given arguments.
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Display all logs from the zebra-scan crate.
tracing_subscriber::fmt::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();

// Parse command line arguments.
let args = Args::from_args();

let zebrad_cache_dir = args.zebrad_cache_dir;
let scanning_cache_dir = args.scanning_cache_dir;
let mut db_config = zebra_scan::Config::default().db_config;
db_config.cache_dir = scanning_cache_dir;
let network = args.network;
let sapling_keys_to_scan = args
.sapling_keys_to_scan
.into_iter()
.map(|key| (key.key, key.birthday_height.0))
.collect();
let listen_addr = args.listen_addr;

// Create a state config with arguments.
let state_config = zebra_state::Config {
cache_dir: zebrad_cache_dir,
..zebra_state::Config::default()
};

// Create a scanner config with arguments.
let scanner_config = zebra_scan::Config {
sapling_keys_to_scan,
listen_addr,
db_config,
};

// Get a read-only state and the database.
let (read_state, _latest_chain_tip, chain_tip_change, sync_task) =
zebra_rpc::sync::init_read_state_with_syncer(
state_config,
&network,
args.zebra_rpc_listen_addr,
)
.await?
.map_err(|err| eyre!(err))?;

// Spawn the scan task.
let scan_task_handle =
{ zebra_scan::spawn_init(scanner_config, network, read_state, chain_tip_change) };

// Pin the scan task handle.
tokio::pin!(scan_task_handle);
tokio::pin!(sync_task);

// Wait for task to finish
tokio::select! {
scan_result = &mut scan_task_handle => scan_result
.expect("unexpected panic in the scan task")
.map(|_| info!("scan task exited"))
.map_err(Into::into),
sync_result = &mut sync_task => {
sync_result.expect("unexpected panic in the scan task");
Ok(())
}
}
}

// Default values for the zebra-scanner arguments.
lazy_static! {
static ref DEFAULT_ZEBRAD_CACHE_DIR: String = zebra_state::Config::default()
.cache_dir
.to_str()
.expect("default cache dir is valid")
.to_string();
static ref DEFAULT_SCANNER_CACHE_DIR: String = zebra_scan::Config::default()
.db_config
.cache_dir
.to_str()
.expect("default cache dir is valid")
.to_string();
static ref DEFAULT_NETWORK: String = Network::default().to_string();
}

/// zebra-scanner arguments
#[derive(Clone, Debug, Eq, PartialEq, StructOpt)]
pub struct Args {
/// Path to zebrad state.
#[structopt(default_value = &DEFAULT_ZEBRAD_CACHE_DIR, long)]
pub zebrad_cache_dir: PathBuf,

/// Path to scanning state.
#[structopt(default_value = &DEFAULT_SCANNER_CACHE_DIR, long)]
pub scanning_cache_dir: PathBuf,

/// The Zcash network.
#[structopt(default_value = &DEFAULT_NETWORK, long)]
pub network: Network,

/// The sapling keys to scan for.
#[structopt(long)]
pub sapling_keys_to_scan: Vec<SaplingKey>,

/// The listen address of Zebra's RPC server used by the syncer to check for chain tip changes
/// and get blocks in Zebra's non-finalized state.
#[structopt(long)]
pub zebra_rpc_listen_addr: SocketAddr,

/// IP address and port for the gRPC server.
#[structopt(long)]
pub listen_addr: Option<SocketAddr>,
}
2 changes: 1 addition & 1 deletion zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Config {
//
// TODO: Remove fields that are only used by the state, and create a common database config.
#[serde(flatten)]
db_config: DbConfig,
pub db_config: DbConfig,
}

impl Debug for Config {
Expand Down
23 changes: 10 additions & 13 deletions zebra-scan/src/service/scan_task/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tower::{Service, ServiceExt};

use tracing::Instrument;
use zcash_address::unified::{Encoding, Fvk, Ufvk};
Expand All @@ -38,7 +38,7 @@ use zebra_chain::{
transaction::Transaction,
};
use zebra_node_services::scan_service::response::ScanResult;
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};
use zebra_state::{ChainTipChange, ReadStateService, SaplingScannedResult, TransactionIndex};

use crate::{
service::{ScanTask, ScanTaskCommand},
Expand All @@ -51,11 +51,8 @@ mod scan_range;

pub use scan_range::ScanRangeTaskBuilder;

/// The generic state type used by the scanner.
pub type State = Buffer<
BoxService<zebra_state::Request, zebra_state::Response, zebra_state::BoxError>,
zebra_state::Request,
>;
/// The read state type used by the scanner.
pub type State = ReadStateService;

/// Wait a few seconds at startup for some blocks to get verified.
///
Expand Down Expand Up @@ -262,13 +259,13 @@ pub async fn scan_height_and_store_results(
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Block(height.into()))
.call(zebra_state::ReadRequest::Block(height.into()))
.await
.map_err(|e| eyre!(e))?;

let block = match block {
zebra_state::Response::Block(Some(block)) => block,
zebra_state::Response::Block(None) => return Ok(None),
zebra_state::ReadResponse::Block(Some(block)) => block,
zebra_state::ReadResponse::Block(None) => return Ok(None),
_ => unreachable!("unmatched response to a state::Block request"),
};

Expand Down Expand Up @@ -515,13 +512,13 @@ async fn tip_height(mut state: State) -> Result<Height, Report> {
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Tip)
.call(zebra_state::ReadRequest::Tip)
.await
.map_err(|e| eyre!(e))?;

match tip {
zebra_state::Response::Tip(Some((height, _hash))) => Ok(height),
zebra_state::Response::Tip(None) => Ok(Height(0)),
zebra_state::ReadResponse::Tip(Some((height, _hash))) => Ok(height),
zebra_state::ReadResponse::Tip(None) => Ok(Height(0)),
_ => unreachable!("unmatched response to a state::Tip request"),
}
}
Expand Down
8 changes: 4 additions & 4 deletions zebra-scan/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,12 @@ pub async fn scan_service_registers_keys_correctly() -> Result<()> {

async fn scan_service_registers_keys_correctly_for(network: &Network) -> Result<()> {
// Mock the state.
let (state, _, _, chain_tip_change) = zebra_state::populated_state(vec![], network).await;
let (_, read_state, _, chain_tip_change) = zebra_state::populated_state(vec![], network).await;

// Instantiate the scan service.
let mut scan_service = ServiceBuilder::new()
.buffer(2)
.service(ScanService::new(&Config::ephemeral(), network, state, chain_tip_change).await);
let mut scan_service = ServiceBuilder::new().buffer(2).service(
ScanService::new(&Config::ephemeral(), network, read_state, chain_tip_change).await,
);

// Mock three Sapling keys.
let mocked_keys = mock_sapling_scanning_keys(3, network);
Expand Down
13 changes: 8 additions & 5 deletions zebrad/tests/common/shielded_scan/scan_task_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::{fs, time::Duration};
use color_eyre::{eyre::eyre, Result};

use tokio::sync::mpsc::error::TryRecvError;
use tower::ServiceBuilder;
use zebra_chain::{
block::Height,
chain_tip::ChainTip,
Expand Down Expand Up @@ -82,9 +81,15 @@ pub(crate) async fn run() -> Result<()> {
let scan_db_path = zebrad_state_path.join(SCANNER_DATABASE_KIND);
fs::remove_dir_all(std::path::Path::new(&scan_db_path)).ok();

let (state_service, _read_state_service, latest_chain_tip, chain_tip_change) =
let (_state_service, _read_state_service, latest_chain_tip, chain_tip_change) =
start_state_service_with_cache_dir(&network, zebrad_state_path.clone()).await?;

let state_config = zebra_state::Config {
cache_dir: zebrad_state_path.clone(),
..zebra_state::Config::default()
};
let (read_state, _db, _) = zebra_state::init_read_only(state_config, &network);

let chain_tip_height = latest_chain_tip
.best_tip_height()
.ok_or_else(|| eyre!("State directory doesn't have a chain tip block"))?;
Expand All @@ -105,11 +110,9 @@ pub(crate) async fn run() -> Result<()> {

tracing::info!("opened state service with valid chain tip height, starting scan task",);

let state = ServiceBuilder::new().buffer(10).service(state_service);

// Create an ephemeral `Storage` instance
let storage = Storage::new(&scan_config, &network, false);
let mut scan_task = ScanTask::spawn(storage, state, chain_tip_change);
let mut scan_task = ScanTask::spawn(storage, read_state, chain_tip_change);

tracing::info!("started scan task, sending register/subscribe keys messages with zecpages key to start scanning for a new key",);

Expand Down

0 comments on commit a94b2be

Please sign in to comment.