Skip to content

Commit

Permalink
add compression
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 25, 2025
1 parent 0901508 commit 4136b04
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 62 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ solana-sdk = "~2.0.16"
url = "2.5.0"
async-stream = "0.3.5"
tokio = { version = "1.28" , features = ["rt-multi-thread"] }
tokio-metrics = { version = "0.4.0", default-features = false }
futures = "0.3.28"
merge-streams = "0.1.2"
anyhow = "1.0.70"
Expand Down
139 changes: 84 additions & 55 deletions examples/dump_slots_stream_samples.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

///
/// subsribe to grpc in multiple ways:
/// - all slots and processed accounts in one subscription
Expand All @@ -18,7 +20,8 @@ use tokio::sync::broadcast;
use tokio::sync::mpsc::Receiver;

use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdateSlot,
SubscribeRequest, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions, SubscribeUpdateSlot,
};

use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
Expand Down Expand Up @@ -54,24 +57,30 @@ fn start_all_slots_and_processed_accounts_consumer(mut slots_channel: Receiver<M
since_epoch_ms
);
}
Some(UpdateOneof::Account(update_account)) => {
let since_epoch_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();

let account_info = update_account.account.unwrap();
let slot = update_account.slot;
let account_pk =
Pubkey::new_from_array(account_info.pubkey.try_into().unwrap());
let write_version = account_info.write_version;
let data_len = account_info.data.len();
// DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872
info!(
"MIXACCOUNT {},{},{},{},{}",
slot, account_pk, write_version, data_len, since_epoch_ms
);
Some(UpdateOneof::BlockMeta(update_block_meta)) => {
info!("block meta {:?}", update_block_meta);
}
Some(UpdateOneof::Transaction(update_transaction)) => {
info!("transaction slot {:?}", update_transaction.slot);
}
// Some(UpdateOneof::Account(update_account)) => {
// let since_epoch_ms = SystemTime::now()
// .duration_since(SystemTime::UNIX_EPOCH)
// .unwrap()
// .as_millis();
//
// let account_info = update_account.account.unwrap();
// let slot = update_account.slot;
// let account_pk =
// Pubkey::new_from_array(account_info.pubkey.try_into().unwrap());
// let write_version = account_info.write_version;
// let data_len = account_info.data.len();
// // DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872
// info!(
// "MIXACCOUNT {},{},{},{},{}",
// slot, account_pk, write_version, data_len, since_epoch_ms
// );
// }
None => {}
_ => {}
},
Expand Down Expand Up @@ -169,7 +178,7 @@ pub async fn main() {
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());

let (_, exit_notify) = broadcast::channel(1);
let (_exit_signal, exit_notify) = broadcast::channel(1);

// mix of (all) slots and processed accounts
let (autoconnect_tx, slots_accounts_rx) = tokio::sync::mpsc::channel(10);
Expand All @@ -180,34 +189,34 @@ pub async fn main() {
exit_notify.resubscribe(),
);

let (only_processed_accounts_tx, only_processed_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_processed_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Processed),
only_processed_accounts_tx.clone(),
exit_notify.resubscribe(),
);

let (only_confirmed_accounts_tx, only_confirmed_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_confirmed_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Confirmed),
only_confirmed_accounts_tx.clone(),
exit_notify.resubscribe(),
);

let (only_finalized_accounts_tx, only_finalized_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_finalized_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Finalized),
only_finalized_accounts_tx.clone(),
exit_notify.resubscribe(),
);
// let (only_processed_accounts_tx, only_processed_accounts_rx) = tokio::sync::mpsc::channel(10);
// let _accounts_processed_stream_ah = create_geyser_autoconnection_task_with_mpsc(
// green_config.clone(),
// accounts_at_level(CommitmentLevel::Processed),
// only_processed_accounts_tx.clone(),
// exit_notify.resubscribe(),
// );
//
// let (only_confirmed_accounts_tx, only_confirmed_accounts_rx) = tokio::sync::mpsc::channel(10);
// let _accounts_confirmed_stream_ah = create_geyser_autoconnection_task_with_mpsc(
// green_config.clone(),
// accounts_at_level(CommitmentLevel::Confirmed),
// only_confirmed_accounts_tx.clone(),
// exit_notify.resubscribe(),
// );
//
// let (only_finalized_accounts_tx, only_finalized_accounts_rx) = tokio::sync::mpsc::channel(10);
// let _accounts_finalized_stream_ah = create_geyser_autoconnection_task_with_mpsc(
// green_config.clone(),
// accounts_at_level(CommitmentLevel::Finalized),
// only_finalized_accounts_tx.clone(),
// exit_notify.resubscribe(),
// );

start_all_slots_and_processed_accounts_consumer(slots_accounts_rx);
start_account_same_level(only_processed_accounts_rx, CommitmentLevel::Processed);
start_account_same_level(only_confirmed_accounts_rx, CommitmentLevel::Confirmed);
start_account_same_level(only_finalized_accounts_rx, CommitmentLevel::Finalized);
// start_account_same_level(only_processed_accounts_rx, CommitmentLevel::Processed);
// start_account_same_level(only_confirmed_accounts_rx, CommitmentLevel::Confirmed);
// start_account_same_level(only_finalized_accounts_rx, CommitmentLevel::Finalized);

// "infinite" sleep
sleep(Duration::from_secs(3600 * 5)).await;
Expand All @@ -224,21 +233,41 @@ fn all_slots_and_processed_accounts_together() -> SubscribeRequest {
filter_by_commitment: None,
},
);
let mut account_subs = HashMap::new();
account_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![RAYDIUM_AMM_PUBKEY.to_string()],
filters: vec![],
nonempty_txn_signature: None,

let transactions = HashMap::from([(
"geyser_tracker_tx".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: None,
signature: None,
account_include: vec![],
account_exclude: vec![],
account_required: vec![],
},
);
)]);

let blocks_meta = HashMap::from([(
"geyser_tracker_blocks_meta".to_string(),
SubscribeRequestFilterBlocksMeta {},
)]);

// let mut account_subs = HashMap::new();
// account_subs.insert(
// "client".to_string(),
// SubscribeRequestFilterAccounts {
// account: vec![],
// owner: vec![RAYDIUM_AMM_PUBKEY.to_string()],
// filters: vec![],
// nonempty_txn_signature: None,
// },
// );

SubscribeRequest {
slots: slot_subs,
accounts: account_subs,
// accounts: account_subs,
ping: None,
transactions,
blocks_meta,
// implies "processed"
commitment: None,
..Default::default()
Expand Down
1 change: 1 addition & 0 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub fn create_geyser_reconnecting_stream(
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter),
None,
)
.await;

Expand Down
2 changes: 2 additions & 0 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub fn create_geyser_autoconnection_task_with_updater(
let config = grpc_source.tls_config.clone();
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let compression = grpc_source.compression;
log!(
if attempt > 1 {
Level::Warn
Expand Down Expand Up @@ -163,6 +164,7 @@ pub fn create_geyser_autoconnection_task_with_updater(
connect_timeout,
request_timeout,
buffer_config,
compression,
);

match await_or_exit(fut_connector, exit_notify.recv()).await {
Expand Down
18 changes: 18 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::Duration;

use solana_sdk::commitment_config::CommitmentConfig;
use tonic::codec::CompressionEncoding;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct GrpcSourceConfig {
pub grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
timeouts: Option<GrpcConnectionTimeouts>,
compression: Option<CompressionEncoding>,
}

impl Display for GrpcSourceConfig {
Expand All @@ -78,6 +80,7 @@ impl GrpcSourceConfig {
grpc_x_token: None,
tls_config: None,
timeouts: None,
compression: None,
}
}
pub fn new(
Expand All @@ -91,6 +94,21 @@ impl GrpcSourceConfig {
grpc_x_token,
tls_config,
timeouts: Some(timeouts),
compression: None,
}
}
pub fn new_compressed(
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
timeouts: GrpcConnectionTimeouts,
) -> Self {
Self {
grpc_addr,
grpc_x_token,
tls_config,
timeouts: Some(timeouts),
compression: Some(CompressionEncoding::Zstd),
}
}
}
Expand Down
23 changes: 16 additions & 7 deletions src/yellowstone_grpc_util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::time::Duration;
use tonic::codec::CompressionEncoding;

use tonic::metadata::errors::InvalidMetadataValue;
use tonic::metadata::AsciiMetadataValue;
Expand All @@ -16,6 +17,7 @@ pub async fn connect_with_timeout<E, T>(
tls_config: Option<ClientTlsConfig>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
compression: Option<CompressionEncoding>,
) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
Expand All @@ -28,6 +30,7 @@ where
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::default(),
compression,
)
.await
}
Expand Down Expand Up @@ -76,6 +79,7 @@ pub async fn connect_with_timeout_with_buffers<E, T>(
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
buffer_config: GeyserGrpcClientBufferConfig,
compression: Option<CompressionEncoding>,
) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
Expand Down Expand Up @@ -111,12 +115,17 @@ where
};

let channel = endpoint.connect_lazy();
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
// DISALLOW GRPC - benching shows that it is slow
// .accept_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(usize::MAX),
);

let health_client = HealthClient::with_interceptor(channel.clone(), interceptor.clone());

let geyser_client = GeyserClient::with_interceptor(channel.clone(), interceptor.clone())
.max_decoding_message_size(usize::MAX);
let geyser_client = if let Some(compression_encoding) = compression {
geyser_client.accept_compressed(compression_encoding)
} else {
geyser_client
};

let client = GeyserGrpcClient::new(health_client, geyser_client);
Ok(client)
}

0 comments on commit 4136b04

Please sign in to comment.