Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send proof reports as they complete #178

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions bubblegum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,24 @@ pub struct VerifyArgs {
pub async fn verify_bubblegum(
context: BubblegumContext,
args: VerifyArgs,
) -> Result<Vec<verify::ProofReport>> {
) -> Result<tokio::sync::mpsc::Receiver<verify::ProofReport>> {
let trees = if let Some(ref only_trees) = args.only_trees {
TreeResponse::find(&context.solana_rpc, only_trees.clone()).await?
} else {
TreeResponse::all(&context.solana_rpc).await?
};

let mut reports = Vec::new();
let (sender, receiver) = tokio::sync::mpsc::channel(trees.len());

for tree in trees {
let report = verify::check(context.clone(), tree, args.max_concurrency).await?;

reports.push(report);
}
tokio::spawn(async move {
for tree in trees {
if let Ok(report) = verify::check(context.clone(), tree, args.max_concurrency).await {
if sender.send(report).await.is_err() {
error!("Failed to send report");
}
}
}
});

Ok(reports)
Ok(receiver)
}
17 changes: 11 additions & 6 deletions bubblegum/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use spl_account_compression::concurrent_tree_wrapper::ProveLeafArgs;
use std::fmt;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
use tracing::debug;

trait TryFromAssetProof {
fn try_from_asset_proof(proof: AssetProof) -> Result<Self, anyhow::Error>
Expand Down Expand Up @@ -89,6 +89,7 @@ pub struct ProofReport {
pub corrupt_proofs: usize,
}

#[derive(Debug)]
enum ProofResult {
Correct,
Incorrect,
Expand Down Expand Up @@ -152,17 +153,21 @@ pub async fn check(

if let Ok(proof_result) = proof_lookup {
let mut report = report.lock().await;

match proof_result {
ProofResult::Correct => report.correct_proofs += 1,
ProofResult::Incorrect => report.incorrect_proofs += 1,
ProofResult::NotFound => report.not_found_proofs += 1,
ProofResult::Corrupt => report.corrupt_proofs += 1,
}
if let ProofResult::Incorrect | ProofResult::NotFound | ProofResult::Corrupt =
proof_result
{
error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "{}", proof_result);
}

debug!(
tree = %tree_pubkey,
leaf_index = i,
asset = %asset,
result = ?proof_result,
"Proof result for asset"
);
}
}));
}
Expand Down
15 changes: 9 additions & 6 deletions grpc-ingest/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{config::ConfigMonitor, prom::update_tree_proof_report};
use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs};
use das_core::{Rpc, SolanaRpcArgs};
use futures::stream::StreamExt;
use tracing::error;
use tracing::{error, info};

pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> {
let mut shutdown = create_shutdown()?;
Expand All @@ -22,20 +22,23 @@ pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> {
};

match verify_bubblegum(bubblegum_context, verify_args).await {
Ok(reports) => {
for report in reports {
Ok(mut reports_receiver) => {
while let Some(report) = reports_receiver.recv().await {
info!(
report = ?report,
);
update_tree_proof_report(&report);
}

tokio::time::sleep(tokio::time::Duration::from_secs(600)).await;
}
Err(e) => {
error!(
message = "Error verifying bubblegum",
message = "Error proof report recv",
error = ?e
);
}
}

tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
}
});

Expand Down
8 changes: 8 additions & 0 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
.expect("collector can't be registered");
};
}

register!(VERSION_INFO_METRIC);
register!(REDIS_STREAM_LENGTH);
register!(REDIS_XADD_STATUS_COUNT);
Expand All @@ -123,6 +124,11 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(INGEST_TASKS);
register!(ACK_TASKS);
register!(GRPC_TASKS);
register!(BUBBLEGUM_TREE_TOTAL_LEAVES);
register!(BUBBLEGUM_TREE_INCORRECT_PROOFS);
register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS);
register!(BUBBLEGUM_TREE_CORRECT_PROOFS);
register!(BUBBLEGUM_TREE_CORRUPT_PROOFS);

VERSION_INFO_METRIC
.with_label_values(&[
Expand All @@ -146,8 +152,10 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
Ok::<_, hyper::Error>(response)
}))
});

let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: http://{address:?}/metrics");

tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
Expand Down
4 changes: 2 additions & 2 deletions ops/src/bubblegum/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Rpc::from_config(&config.solana);
let context = BubblegumContext::new(database_pool, solana_rpc);

let reports = verify_bubblegum(context, config.verify_bubblegum).await?;
let mut reports = verify_bubblegum(context, config.verify_bubblegum).await?;

for report in reports {
while let Some(report) = reports.recv().await {
info!(
"Tree: {}, Total Leaves: {}, Incorrect Proofs: {}, Not Found Proofs: {}, Correct Proofs: {}",
report.tree_pubkey,
Expand Down
Loading