From 4136b046e8a338c80845b855e4bc49117cad38ee Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Sat, 25 Jan 2025 10:49:34 +0100 Subject: [PATCH] add compression --- Cargo.lock | 12 ++ Cargo.toml | 1 + examples/dump_slots_stream_samples.rs | 139 +++++++++++------- ...grpc_subscription_autoreconnect_streams.rs | 1 + src/grpc_subscription_autoreconnect_tasks.rs | 2 + src/lib.rs | 18 +++ src/yellowstone_grpc_util.rs | 23 ++- 7 files changed, 134 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06d9056..da4f4e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1342,6 +1342,7 @@ dependencies = [ "solana-logger", "solana-sdk", "tokio", + "tokio-metrics", "tonic", "tonic-health", "tracing", @@ -3804,6 +3805,17 @@ dependencies = [ "syn 2.0.89", ] +[[package]] +name = "tokio-metrics" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb2bb07a8451c4c6fa8b3497ad198510d8b8dffa5df5cfb97a64102a58b113c8" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio-stream", +] + [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/Cargo.toml b/Cargo.toml index fefc256..374cfd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ solana-sdk = "~2.0.16" url = "2.5.0" async-stream = "0.3.5" tokio = { version = "1.28" , features = ["rt-multi-thread"] } +tokio-metrics = { version = "0.4.0", default-features = false } futures = "0.3.28" merge-streams = "0.1.2" anyhow = "1.0.70" diff --git a/examples/dump_slots_stream_samples.rs b/examples/dump_slots_stream_samples.rs index 46615b9..12fec10 100644 --- a/examples/dump_slots_stream_samples.rs +++ b/examples/dump_slots_stream_samples.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + /// /// subsribe to grpc in multiple ways: /// - all slots and processed accounts in one subscription @@ -18,7 +20,8 @@ use tokio::sync::broadcast; use tokio::sync::mpsc::Receiver; use yellowstone_grpc_proto::geyser::{ - SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdateSlot, + SubscribeRequest, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, SubscribeUpdateSlot, }; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; @@ -54,24 +57,30 @@ fn start_all_slots_and_processed_accounts_consumer(mut slots_channel: Receiver { - let since_epoch_ms = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis(); - - let account_info = update_account.account.unwrap(); - let slot = update_account.slot; - let account_pk = - Pubkey::new_from_array(account_info.pubkey.try_into().unwrap()); - let write_version = account_info.write_version; - let data_len = account_info.data.len(); - // DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872 - info!( - "MIXACCOUNT {},{},{},{},{}", - slot, account_pk, write_version, data_len, since_epoch_ms - ); + Some(UpdateOneof::BlockMeta(update_block_meta)) => { + info!("block meta {:?}", update_block_meta); + } + Some(UpdateOneof::Transaction(update_transaction)) => { + info!("transaction slot {:?}", update_transaction.slot); } + // Some(UpdateOneof::Account(update_account)) => { + // let since_epoch_ms = SystemTime::now() + // .duration_since(SystemTime::UNIX_EPOCH) + // .unwrap() + // .as_millis(); + // + // let account_info = update_account.account.unwrap(); + // let slot = update_account.slot; + // let account_pk = + // Pubkey::new_from_array(account_info.pubkey.try_into().unwrap()); + // let write_version = account_info.write_version; + // let data_len = account_info.data.len(); + // // DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872 + // info!( + // "MIXACCOUNT {},{},{},{},{}", + // slot, account_pk, write_version, data_len, since_epoch_ms + // ); + // } None => {} _ => {} }, @@ -169,7 +178,7 @@ pub async fn main() { let green_config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); - let (_, exit_notify) = broadcast::channel(1); + let (_exit_signal, exit_notify) = broadcast::channel(1); // mix of (all) slots and processed accounts let (autoconnect_tx, slots_accounts_rx) = tokio::sync::mpsc::channel(10); @@ -180,34 +189,34 @@ pub async fn main() { exit_notify.resubscribe(), ); - let (only_processed_accounts_tx, only_processed_accounts_rx) = tokio::sync::mpsc::channel(10); - let _accounts_processed_stream_ah = create_geyser_autoconnection_task_with_mpsc( - green_config.clone(), - accounts_at_level(CommitmentLevel::Processed), - only_processed_accounts_tx.clone(), - exit_notify.resubscribe(), - ); - - let (only_confirmed_accounts_tx, only_confirmed_accounts_rx) = tokio::sync::mpsc::channel(10); - let _accounts_confirmed_stream_ah = create_geyser_autoconnection_task_with_mpsc( - green_config.clone(), - accounts_at_level(CommitmentLevel::Confirmed), - only_confirmed_accounts_tx.clone(), - exit_notify.resubscribe(), - ); - - let (only_finalized_accounts_tx, only_finalized_accounts_rx) = tokio::sync::mpsc::channel(10); - let _accounts_finalized_stream_ah = create_geyser_autoconnection_task_with_mpsc( - green_config.clone(), - accounts_at_level(CommitmentLevel::Finalized), - only_finalized_accounts_tx.clone(), - exit_notify.resubscribe(), - ); + // let (only_processed_accounts_tx, only_processed_accounts_rx) = tokio::sync::mpsc::channel(10); + // let _accounts_processed_stream_ah = create_geyser_autoconnection_task_with_mpsc( + // green_config.clone(), + // accounts_at_level(CommitmentLevel::Processed), + // only_processed_accounts_tx.clone(), + // exit_notify.resubscribe(), + // ); + // + // let (only_confirmed_accounts_tx, only_confirmed_accounts_rx) = tokio::sync::mpsc::channel(10); + // let _accounts_confirmed_stream_ah = create_geyser_autoconnection_task_with_mpsc( + // green_config.clone(), + // accounts_at_level(CommitmentLevel::Confirmed), + // only_confirmed_accounts_tx.clone(), + // exit_notify.resubscribe(), + // ); + // + // let (only_finalized_accounts_tx, only_finalized_accounts_rx) = tokio::sync::mpsc::channel(10); + // let _accounts_finalized_stream_ah = create_geyser_autoconnection_task_with_mpsc( + // green_config.clone(), + // accounts_at_level(CommitmentLevel::Finalized), + // only_finalized_accounts_tx.clone(), + // exit_notify.resubscribe(), + // ); start_all_slots_and_processed_accounts_consumer(slots_accounts_rx); - start_account_same_level(only_processed_accounts_rx, CommitmentLevel::Processed); - start_account_same_level(only_confirmed_accounts_rx, CommitmentLevel::Confirmed); - start_account_same_level(only_finalized_accounts_rx, CommitmentLevel::Finalized); + // start_account_same_level(only_processed_accounts_rx, CommitmentLevel::Processed); + // start_account_same_level(only_confirmed_accounts_rx, CommitmentLevel::Confirmed); + // start_account_same_level(only_finalized_accounts_rx, CommitmentLevel::Finalized); // "infinite" sleep sleep(Duration::from_secs(3600 * 5)).await; @@ -224,21 +233,41 @@ fn all_slots_and_processed_accounts_together() -> SubscribeRequest { filter_by_commitment: None, }, ); - let mut account_subs = HashMap::new(); - account_subs.insert( - "client".to_string(), - SubscribeRequestFilterAccounts { - account: vec![], - owner: vec![RAYDIUM_AMM_PUBKEY.to_string()], - filters: vec![], - nonempty_txn_signature: None, + + let transactions = HashMap::from([( + "geyser_tracker_tx".to_string(), + SubscribeRequestFilterTransactions { + vote: Some(false), + failed: None, + signature: None, + account_include: vec![], + account_exclude: vec![], + account_required: vec![], }, - ); + )]); + + let blocks_meta = HashMap::from([( + "geyser_tracker_blocks_meta".to_string(), + SubscribeRequestFilterBlocksMeta {}, + )]); + + // let mut account_subs = HashMap::new(); + // account_subs.insert( + // "client".to_string(), + // SubscribeRequestFilterAccounts { + // account: vec![], + // owner: vec![RAYDIUM_AMM_PUBKEY.to_string()], + // filters: vec![], + // nonempty_txn_signature: None, + // }, + // ); SubscribeRequest { slots: slot_subs, - accounts: account_subs, + // accounts: account_subs, ping: None, + transactions, + blocks_meta, // implies "processed" commitment: None, ..Default::default() diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index f13ae54..585383c 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -58,6 +58,7 @@ pub fn create_geyser_reconnecting_stream( connect_timeout, request_timeout, GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter), + None, ) .await; diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 1507a95..e985103 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -111,6 +111,7 @@ pub fn create_geyser_autoconnection_task_with_updater( let config = grpc_source.tls_config.clone(); let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout); let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); + let compression = grpc_source.compression; log!( if attempt > 1 { Level::Warn @@ -163,6 +164,7 @@ pub fn create_geyser_autoconnection_task_with_updater( connect_timeout, request_timeout, buffer_config, + compression, ); match await_or_exit(fut_connector, exit_notify.recv()).await { diff --git a/src/lib.rs b/src/lib.rs index 5beb2c3..c9e8dac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::Duration; use solana_sdk::commitment_config::CommitmentConfig; +use tonic::codec::CompressionEncoding; use yellowstone_grpc_proto::geyser::{ CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, @@ -52,6 +53,7 @@ pub struct GrpcSourceConfig { pub grpc_x_token: Option, tls_config: Option, timeouts: Option, + compression: Option, } impl Display for GrpcSourceConfig { @@ -78,6 +80,7 @@ impl GrpcSourceConfig { grpc_x_token: None, tls_config: None, timeouts: None, + compression: None, } } pub fn new( @@ -91,6 +94,21 @@ impl GrpcSourceConfig { grpc_x_token, tls_config, timeouts: Some(timeouts), + compression: None, + } + } + pub fn new_compressed( + grpc_addr: String, + grpc_x_token: Option, + tls_config: Option, + timeouts: GrpcConnectionTimeouts, + ) -> Self { + Self { + grpc_addr, + grpc_x_token, + tls_config, + timeouts: Some(timeouts), + compression: Some(CompressionEncoding::Zstd), } } } diff --git a/src/yellowstone_grpc_util.rs b/src/yellowstone_grpc_util.rs index 3df9c06..7ef9b5e 100644 --- a/src/yellowstone_grpc_util.rs +++ b/src/yellowstone_grpc_util.rs @@ -1,4 +1,5 @@ use std::time::Duration; +use tonic::codec::CompressionEncoding; use tonic::metadata::errors::InvalidMetadataValue; use tonic::metadata::AsciiMetadataValue; @@ -16,6 +17,7 @@ pub async fn connect_with_timeout( tls_config: Option, connect_timeout: Option, request_timeout: Option, + compression: Option, ) -> GeyserGrpcBuilderResult> where E: Into, @@ -28,6 +30,7 @@ where connect_timeout, request_timeout, GeyserGrpcClientBufferConfig::default(), + compression, ) .await } @@ -76,6 +79,7 @@ pub async fn connect_with_timeout_with_buffers( connect_timeout: Option, request_timeout: Option, buffer_config: GeyserGrpcClientBufferConfig, + compression: Option, ) -> GeyserGrpcBuilderResult> where E: Into, @@ -111,12 +115,17 @@ where }; let channel = endpoint.connect_lazy(); - let client = GeyserGrpcClient::new( - HealthClient::with_interceptor(channel.clone(), interceptor.clone()), - GeyserClient::with_interceptor(channel, interceptor) - // DISALLOW GRPC - benching shows that it is slow - // .accept_compressed(CompressionEncoding::Gzip) - .max_decoding_message_size(usize::MAX), - ); + + let health_client = HealthClient::with_interceptor(channel.clone(), interceptor.clone()); + + let geyser_client = GeyserClient::with_interceptor(channel.clone(), interceptor.clone()) + .max_decoding_message_size(usize::MAX); + let geyser_client = if let Some(compression_encoding) = compression { + geyser_client.accept_compressed(compression_encoding) + } else { + geyser_client + }; + + let client = GeyserGrpcClient::new(health_client, geyser_client); Ok(client) }