Skip to content

Commit

Permalink
Report proofs as they come (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola authored Nov 8, 2024
1 parent d3bfe1b commit f311e14
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 22 deletions.
20 changes: 12 additions & 8 deletions bubblegum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,24 @@ pub struct VerifyArgs {
pub async fn verify_bubblegum(
context: BubblegumContext,
args: VerifyArgs,
) -> Result<Vec<verify::ProofReport>> {
) -> Result<tokio::sync::mpsc::Receiver<verify::ProofReport>> {
let trees = if let Some(ref only_trees) = args.only_trees {
TreeResponse::find(&context.solana_rpc, only_trees.clone()).await?
} else {
TreeResponse::all(&context.solana_rpc).await?
};

let mut reports = Vec::new();
let (sender, receiver) = tokio::sync::mpsc::channel(trees.len());

for tree in trees {
let report = verify::check(context.clone(), tree, args.max_concurrency).await?;

reports.push(report);
}
tokio::spawn(async move {
for tree in trees {
if let Ok(report) = verify::check(context.clone(), tree, args.max_concurrency).await {
if sender.send(report).await.is_err() {
error!("Failed to send report");
}
}
}
});

Ok(reports)
Ok(receiver)
}
17 changes: 11 additions & 6 deletions bubblegum/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use spl_account_compression::concurrent_tree_wrapper::ProveLeafArgs;
use std::fmt;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
use tracing::debug;

trait TryFromAssetProof {
fn try_from_asset_proof(proof: AssetProof) -> Result<Self, anyhow::Error>
Expand Down Expand Up @@ -89,6 +89,7 @@ pub struct ProofReport {
pub corrupt_proofs: usize,
}

#[derive(Debug)]
enum ProofResult {
Correct,
Incorrect,
Expand Down Expand Up @@ -152,17 +153,21 @@ pub async fn check(

if let Ok(proof_result) = proof_lookup {
let mut report = report.lock().await;

match proof_result {
ProofResult::Correct => report.correct_proofs += 1,
ProofResult::Incorrect => report.incorrect_proofs += 1,
ProofResult::NotFound => report.not_found_proofs += 1,
ProofResult::Corrupt => report.corrupt_proofs += 1,
}
if let ProofResult::Incorrect | ProofResult::NotFound | ProofResult::Corrupt =
proof_result
{
error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "{}", proof_result);
}

debug!(
tree = %tree_pubkey,
leaf_index = i,
asset = %asset,
result = ?proof_result,
"Proof result for asset"
);
}
}));
}
Expand Down
15 changes: 9 additions & 6 deletions grpc-ingest/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{config::ConfigMonitor, prom::update_tree_proof_report};
use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs};
use das_core::{Rpc, SolanaRpcArgs};
use futures::stream::StreamExt;
use tracing::error;
use tracing::{error, info};

pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> {
let mut shutdown = create_shutdown()?;
Expand All @@ -22,20 +22,23 @@ pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> {
};

match verify_bubblegum(bubblegum_context, verify_args).await {
Ok(reports) => {
for report in reports {
Ok(mut reports_receiver) => {
while let Some(report) = reports_receiver.recv().await {
info!(
report = ?report,
);
update_tree_proof_report(&report);
}

tokio::time::sleep(tokio::time::Duration::from_secs(600)).await;
}
Err(e) => {
error!(
message = "Error verifying bubblegum",
message = "Error proof report recv",
error = ?e
);
}
}

tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
}
});

Expand Down
8 changes: 8 additions & 0 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
.expect("collector can't be registered");
};
}

register!(VERSION_INFO_METRIC);
register!(REDIS_STREAM_LENGTH);
register!(REDIS_XADD_STATUS_COUNT);
Expand All @@ -123,6 +124,11 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(INGEST_TASKS);
register!(ACK_TASKS);
register!(GRPC_TASKS);
register!(BUBBLEGUM_TREE_TOTAL_LEAVES);
register!(BUBBLEGUM_TREE_INCORRECT_PROOFS);
register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS);
register!(BUBBLEGUM_TREE_CORRECT_PROOFS);
register!(BUBBLEGUM_TREE_CORRUPT_PROOFS);

VERSION_INFO_METRIC
.with_label_values(&[
Expand All @@ -146,8 +152,10 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
Ok::<_, hyper::Error>(response)
}))
});

let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: http://{address:?}/metrics");

tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
Expand Down
4 changes: 2 additions & 2 deletions ops/src/bubblegum/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Rpc::from_config(&config.solana);
let context = BubblegumContext::new(database_pool, solana_rpc);

let reports = verify_bubblegum(context, config.verify_bubblegum).await?;
let mut reports = verify_bubblegum(context, config.verify_bubblegum).await?;

for report in reports {
while let Some(report) = reports.recv().await {
info!(
"Tree: {}, Total Leaves: {}, Incorrect Proofs: {}, Not Found Proofs: {}, Correct Proofs: {}",
report.tree_pubkey,
Expand Down

0 comments on commit f311e14

Please sign in to comment.