From f311e149f245b65fb216bd7e8568538c68a2a9ec Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Fri, 8 Nov 2024 21:23:43 +0100 Subject: [PATCH] Report proofs as they come (#178) --- bubblegum/src/lib.rs | 20 ++++++++++++-------- bubblegum/src/verify.rs | 17 +++++++++++------ grpc-ingest/src/monitor.rs | 15 +++++++++------ grpc-ingest/src/prom.rs | 8 ++++++++ ops/src/bubblegum/verify.rs | 4 ++-- 5 files changed, 42 insertions(+), 22 deletions(-) diff --git a/bubblegum/src/lib.rs b/bubblegum/src/lib.rs index f1dc031c..46352633 100644 --- a/bubblegum/src/lib.rs +++ b/bubblegum/src/lib.rs @@ -165,20 +165,24 @@ pub struct VerifyArgs { pub async fn verify_bubblegum( context: BubblegumContext, args: VerifyArgs, -) -> Result> { +) -> Result> { let trees = if let Some(ref only_trees) = args.only_trees { TreeResponse::find(&context.solana_rpc, only_trees.clone()).await? } else { TreeResponse::all(&context.solana_rpc).await? }; - let mut reports = Vec::new(); + let (sender, receiver) = tokio::sync::mpsc::channel(trees.len()); - for tree in trees { - let report = verify::check(context.clone(), tree, args.max_concurrency).await?; - - reports.push(report); - } + tokio::spawn(async move { + for tree in trees { + if let Ok(report) = verify::check(context.clone(), tree, args.max_concurrency).await { + if sender.send(report).await.is_err() { + error!("Failed to send report"); + } + } + } + }); - Ok(reports) + Ok(receiver) } diff --git a/bubblegum/src/verify.rs b/bubblegum/src/verify.rs index fae6051d..e99972ce 100644 --- a/bubblegum/src/verify.rs +++ b/bubblegum/src/verify.rs @@ -13,7 +13,7 @@ use spl_account_compression::concurrent_tree_wrapper::ProveLeafArgs; use std::fmt; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::error; +use tracing::debug; trait TryFromAssetProof { fn try_from_asset_proof(proof: AssetProof) -> Result @@ -89,6 +89,7 @@ pub struct ProofReport { pub corrupt_proofs: usize, } +#[derive(Debug)] enum ProofResult { Correct, Incorrect, @@ -152,17 +153,21 @@ pub async fn check( if let Ok(proof_result) = proof_lookup { let mut report = report.lock().await; + match proof_result { ProofResult::Correct => report.correct_proofs += 1, ProofResult::Incorrect => report.incorrect_proofs += 1, ProofResult::NotFound => report.not_found_proofs += 1, ProofResult::Corrupt => report.corrupt_proofs += 1, } - if let ProofResult::Incorrect | ProofResult::NotFound | ProofResult::Corrupt = - proof_result - { - error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "{}", proof_result); - } + + debug!( + tree = %tree_pubkey, + leaf_index = i, + asset = %asset, + result = ?proof_result, + "Proof result for asset" + ); } })); } diff --git a/grpc-ingest/src/monitor.rs b/grpc-ingest/src/monitor.rs index ac0c687e..7c7e277f 100644 --- a/grpc-ingest/src/monitor.rs +++ b/grpc-ingest/src/monitor.rs @@ -4,7 +4,7 @@ use crate::{config::ConfigMonitor, prom::update_tree_proof_report}; use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs}; use das_core::{Rpc, SolanaRpcArgs}; use futures::stream::StreamExt; -use tracing::error; +use tracing::{error, info}; pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> { let mut shutdown = create_shutdown()?; @@ -22,20 +22,23 @@ pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> { }; match verify_bubblegum(bubblegum_context, verify_args).await { - Ok(reports) => { - for report in reports { + Ok(mut reports_receiver) => { + while let Some(report) = reports_receiver.recv().await { + info!( + report = ?report, + ); update_tree_proof_report(&report); } + + tokio::time::sleep(tokio::time::Duration::from_secs(600)).await; } Err(e) => { error!( - message = "Error verifying bubblegum", + message = "Error proof report recv", error = ?e ); } } - - tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; } }); diff --git a/grpc-ingest/src/prom.rs b/grpc-ingest/src/prom.rs index 9819d908..44f9557d 100644 --- a/grpc-ingest/src/prom.rs +++ b/grpc-ingest/src/prom.rs @@ -111,6 +111,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { .expect("collector can't be registered"); }; } + register!(VERSION_INFO_METRIC); register!(REDIS_STREAM_LENGTH); register!(REDIS_XADD_STATUS_COUNT); @@ -123,6 +124,11 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { register!(INGEST_TASKS); register!(ACK_TASKS); register!(GRPC_TASKS); + register!(BUBBLEGUM_TREE_TOTAL_LEAVES); + register!(BUBBLEGUM_TREE_INCORRECT_PROOFS); + register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS); + register!(BUBBLEGUM_TREE_CORRECT_PROOFS); + register!(BUBBLEGUM_TREE_CORRUPT_PROOFS); VERSION_INFO_METRIC .with_label_values(&[ @@ -146,8 +152,10 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { Ok::<_, hyper::Error>(response) })) }); + let server = Server::try_bind(&address)?.serve(make_service); info!("prometheus server started: http://{address:?}/metrics"); + tokio::spawn(async move { if let Err(error) = server.await { error!("prometheus server failed: {error:?}"); diff --git a/ops/src/bubblegum/verify.rs b/ops/src/bubblegum/verify.rs index 947d2b3a..46cb97cf 100644 --- a/ops/src/bubblegum/verify.rs +++ b/ops/src/bubblegum/verify.rs @@ -25,9 +25,9 @@ pub async fn run(config: Args) -> Result<()> { let solana_rpc = Rpc::from_config(&config.solana); let context = BubblegumContext::new(database_pool, solana_rpc); - let reports = verify_bubblegum(context, config.verify_bubblegum).await?; + let mut reports = verify_bubblegum(context, config.verify_bubblegum).await?; - for report in reports { + while let Some(report) = reports.recv().await { info!( "Tree: {}, Total Leaves: {}, Incorrect Proofs: {}, Not Found Proofs: {}, Correct Proofs: {}", report.tree_pubkey,