Skip to content

Commit

Permalink
add verify flag to client
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Nov 15, 2024
1 parent 0e413ba commit 68a965c
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 24 deletions.
97 changes: 79 additions & 18 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ use {
serde_json::{json, Value},
solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Signature},
solana_transaction_status::UiTransactionEncoding,
std::{collections::HashMap, env, fs::File, sync::Arc, time::Duration},
tokio::sync::Mutex,
std::{
collections::HashMap,
env,
fs::File,
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
},
tokio::{fs, sync::Mutex},
tonic::transport::channel::ClientTlsConfig,
yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, Interceptor},
yellowstone_grpc_proto::{
convert_from,
plugin::{filter::FilterName, message_ref::Message as MessageRef},
prelude::{
subscribe_request_filter_accounts_filter::Filter as AccountsFilterOneof,
subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports,
Expand Down Expand Up @@ -258,13 +265,16 @@ struct ActionSubscribe {
/// Show total stat instead of messages
#[clap(long, default_value_t = false)]
stats: bool,

#[clap(long, default_value_t = false)]
verify_encoding: bool,
}

impl Action {
async fn get_subscribe_request(
&self,
commitment: Option<CommitmentLevel>,
) -> anyhow::Result<Option<(SubscribeRequest, usize, bool)>> {
) -> anyhow::Result<Option<(SubscribeRequest, usize, bool, bool)>> {
Ok(match self {
Self::Subscribe(args) => {
let mut accounts: AccountFilterMap = HashMap::new();
Expand Down Expand Up @@ -439,6 +449,7 @@ impl Action {
},
args.resub.unwrap_or(0),
args.stats,
args.verify_encoding,
))
}
_ => None,
Expand Down Expand Up @@ -485,7 +496,7 @@ async fn main() -> anyhow::Result<()> {
.map(|response| info!("response: {response:?}")),
Action::HealthWatch => geyser_health_watch(client).await,
Action::Subscribe(_) => {
let (request, resub, stats) = args
let (request, resub, stats, verify_encoding) = args
.action
.get_subscribe_request(commitment)
.await
Expand All @@ -494,7 +505,7 @@ async fn main() -> anyhow::Result<()> {
"expect subscribe action"
)))?;

geyser_subscribe(client, request, resub, stats).await
geyser_subscribe(client, request, resub, stats, verify_encoding).await
}
Action::Ping { count } => client
.ping(*count)
Expand Down Expand Up @@ -552,26 +563,29 @@ async fn geyser_subscribe(
request: SubscribeRequest,
resub: usize,
stats: bool,
verify_encoding: bool,
) -> anyhow::Result<()> {
let pb_multi = MultiProgress::new();
let mut pb_accounts_c = 0;
let pb_accounts = crate_progress_bar(&pb_multi, "accounts", false)?;
let pb_accounts = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("accounts"))?;
let mut pb_slots_c = 0;
let pb_slots = crate_progress_bar(&pb_multi, "slots", false)?;
let pb_slots = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("slots"))?;
let mut pb_txs_c = 0;
let pb_txs = crate_progress_bar(&pb_multi, "transactions", false)?;
let pb_txs = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("transactions"))?;
let mut pb_txs_st_c = 0;
let pb_txs_st = crate_progress_bar(&pb_multi, "transactions statuses", false)?;
let pb_txs_st = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("transactions statuses"))?;
let mut pb_entries_c = 0;
let pb_entries = crate_progress_bar(&pb_multi, "entries", false)?;
let pb_entries = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("entries"))?;
let mut pb_blocks_mt_c = 0;
let pb_blocks_mt = crate_progress_bar(&pb_multi, "blocks meta", false)?;
let pb_blocks_mt = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("blocks meta"))?;
let mut pb_blocks_c = 0;
let pb_blocks = crate_progress_bar(&pb_multi, "blocks", false)?;
let pb_blocks = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("blocks"))?;
let mut pb_pp_c = 0;
let pb_pp = crate_progress_bar(&pb_multi, "ping/pong", false)?;
let pb_pp = crate_progress_bar(&pb_multi, ProgressBarTpl::Msg("ping/pong"))?;
let mut pb_total_c = 0;
let pb_total = crate_progress_bar(&pb_multi, "total", true)?;
let pb_total = crate_progress_bar(&pb_multi, ProgressBarTpl::Total)?;
let mut pb_verify_c = verify_encoding.then_some((0, 0));
let pb_verify = crate_progress_bar(&pb_multi, ProgressBarTpl::Verify)?;

let (mut subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?;

Expand Down Expand Up @@ -603,6 +617,38 @@ async fn geyser_subscribe(
pb_total_c += 1;
pb_total.set_message(format_thousands(pb_total_c));
pb_total.inc(encoded_len);
if let Some((prost_c, ref_c)) = &mut pb_verify_c {
// let msg = msg.update_oneof.ok_or(anyhow::anyhow!("no update"))?;
let ts = Instant::now();
let encoded_len_prost = msg.encoded_len();
let encoded_prost = msg.clone().encode_to_vec();
*prost_c += ts.elapsed().as_nanos();
// Temporary `convert`, need to implement in proto crate
let message = MessageRef::new(
msg.filters.into_iter().map(FilterName::new).collect(),
msg.update_oneof.expect("no update message").into(),
);
let ts = Instant::now();
let encoded_len_ref = message.encoded_len();
let encoded_ref = message.encode_to_vec();
*ref_c += ts.elapsed().as_nanos();
pb_verify.set_message(format!(
"{:.2?}%",
100f64 * (*ref_c as f64) / (*prost_c as f64)
));
if encoded_len_prost != encoded_len_ref || encoded_prost != encoded_ref {
let dir = "grpc-client-verify";
let name = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
let path = format!("{dir}/{name}");
error!("found unmached message, save to `{path}`");
fs::create_dir(dir)
.await
.context("failed to create dir for unmached")?;
fs::write(path, encoded_prost)
.await
.context("failed to save unmached")?;
}
}
continue;
}

Expand Down Expand Up @@ -752,14 +798,29 @@ async fn geyser_subscribe(
Ok(())
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ProgressBarTpl {
Msg(&'static str),
Total,
Verify,
}

fn crate_progress_bar(
pb: &MultiProgress,
kind: &str,
elapsed: bool,
pb_t: ProgressBarTpl,
) -> Result<ProgressBar, indicatif::style::TemplateError> {
let pb = pb.add(ProgressBar::no_length());
let elapsed = if elapsed { " in {elapsed_precise}" } else { "" };
let tpl = format!("{{spinner}} {kind}: {{msg}} / ~{{bytes}} (~{{bytes_per_sec}}){elapsed}");
let tpl = match pb_t {
ProgressBarTpl::Msg(kind) => {
format!("{{spinner}} {kind}: {{msg}} / ~{{bytes}} (~{{bytes_per_sec}})")
}
ProgressBarTpl::Total => {
"{spinner} total: {msg} / ~{bytes} (~{bytes_per_sec}) in {elapsed_precise}".to_owned()
}
ProgressBarTpl::Verify => {
"{spinner} verify: {msg} (elapsed time, compare to prost)".to_owned()
}
};
pb.set_style(ProgressStyle::with_template(&tpl)?);
Ok(pb)
}
Expand Down
Loading

0 comments on commit 68a965c

Please sign in to comment.