-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
accounts stream from da11 ledger-tool
- Loading branch information
1 parent
8574783
commit c71cbbe
Showing
1 changed file
with
193 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
// | ||
// ``` | ||
// ssh eclipse-rpc -Nv | ||
// ``` | ||
// | ||
|
||
use futures::{Stream, StreamExt}; | ||
use itertools::Itertools; | ||
use log::{debug, info}; | ||
use solana_account_decoder::parse_token::spl_token_ids; | ||
use solana_sdk::clock::{Slot, UnixTimestamp}; | ||
use solana_sdk::commitment_config::CommitmentConfig; | ||
use solana_sdk::hash::{hash, Hash}; | ||
use solana_sdk::pubkey::Pubkey; | ||
use std::cmp::min; | ||
use std::collections::{HashMap, VecDeque}; | ||
use std::pin::pin; | ||
use std::str::FromStr; | ||
use std::sync::atomic::{AtomicU64, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::{Instant, SystemTime, UNIX_EPOCH}; | ||
use std::{env, iter}; | ||
use tokio::sync::mpsc::Receiver; | ||
|
||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; | ||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; | ||
use geyser_grpc_connector::grpcmultiplex_fastestwins::{ | ||
create_multiplexed_stream, FromYellowstoneExtractor, | ||
}; | ||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; | ||
use tokio::time::{sleep, Duration}; | ||
use tracing::{trace, warn}; | ||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; | ||
use yellowstone_grpc_proto::geyser::{ | ||
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta, | ||
SubscribeRequestFilterSlots, SubscribeUpdate, | ||
}; | ||
use yellowstone_grpc_proto::prost::Message as _; | ||
|
||
type AtomicSlot = Arc<AtomicU64>; | ||
|
||
#[tokio::main] | ||
pub async fn main() { | ||
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace | ||
tracing_subscriber::fmt::init(); | ||
// console_subscriber::init(); | ||
|
||
let grpc_addr = env::var("GRPC_ADDR").expect("need grpc url"); | ||
let grpc_x_token = env::var("GRPC_X_TOKEN").ok(); | ||
|
||
info!( | ||
"Using grpc source on {} ({})", | ||
grpc_addr, | ||
grpc_x_token.is_some() | ||
); | ||
|
||
let timeouts = GrpcConnectionTimeouts { | ||
connect_timeout: Duration::from_secs(25), | ||
request_timeout: Duration::from_secs(25), | ||
subscribe_timeout: Duration::from_secs(25), | ||
receive_timeout: Duration::from_secs(25), | ||
}; | ||
|
||
let config = GrpcSourceConfig::new(grpc_addr, grpc_x_token, None, timeouts.clone()); | ||
|
||
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10); | ||
let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1); | ||
|
||
let _all_accounts = create_geyser_autoconnection_task_with_mpsc( | ||
config.clone(), | ||
all_accounts(), | ||
autoconnect_tx.clone(), | ||
exit_rx.resubscribe(), | ||
); | ||
|
||
let current_processed_slot = AtomicSlot::default(); | ||
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone()); | ||
|
||
// "infinite" sleep | ||
sleep(Duration::from_secs(1800)).await; | ||
} | ||
|
||
// note: this keeps track of lot of data and might blow up memory | ||
fn start_tracking_account_consumer( | ||
mut geyser_messages_rx: Receiver<Message>, | ||
current_processed_slot: Arc<AtomicU64>, | ||
) { | ||
tokio::spawn(async move { | ||
loop { | ||
match geyser_messages_rx.recv().await { | ||
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { | ||
Some(UpdateOneof::Account(update)) => { | ||
let started_at = Instant::now(); | ||
let now = SystemTime::now(); | ||
let account_info = update.account.unwrap(); | ||
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap(); | ||
let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap(); | ||
// note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built | ||
let slot = update.slot; | ||
let account_receive_time = get_epoch_sec(); | ||
|
||
info!( | ||
"Account update: slot: {}, account_pk: {}, account_owner_pk: {}, account_receive_time: {}", | ||
slot, account_pk, account_owner_pk, account_receive_time | ||
); | ||
} | ||
None => {} | ||
_ => {} | ||
}, | ||
None => { | ||
log::warn!("multiplexer channel closed - aborting"); | ||
return; | ||
} | ||
Some(Message::Connecting(_)) => {} | ||
} | ||
} | ||
}); | ||
} | ||
|
||
fn get_epoch_sec() -> UnixTimestamp { | ||
SystemTime::now() | ||
.duration_since(UNIX_EPOCH) | ||
.unwrap() | ||
.as_secs() as UnixTimestamp | ||
} | ||
|
||
pub fn token_accounts() -> SubscribeRequest { | ||
let mut accounts_subs = HashMap::new(); | ||
accounts_subs.insert( | ||
"client".to_string(), | ||
SubscribeRequestFilterAccounts { | ||
account: vec![], | ||
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()], | ||
owner: spl_token_ids() | ||
.iter() | ||
.map(|pubkey| pubkey.to_string()) | ||
.collect(), | ||
filters: vec![], | ||
}, | ||
); | ||
|
||
SubscribeRequest { | ||
accounts: accounts_subs, | ||
..Default::default() | ||
} | ||
} | ||
|
||
pub fn all_accounts_and_blocksmeta() -> SubscribeRequest { | ||
let mut accounts_subs = HashMap::new(); | ||
accounts_subs.insert( | ||
"client".to_string(), | ||
SubscribeRequestFilterAccounts { | ||
account: vec![], | ||
owner: vec![], | ||
filters: vec![], | ||
}, | ||
); | ||
|
||
let mut slots_subs = HashMap::new(); | ||
slots_subs.insert( | ||
"client".to_string(), | ||
SubscribeRequestFilterSlots { | ||
filter_by_commitment: Some(true), | ||
}, | ||
); | ||
|
||
let mut blocks_meta_subs = HashMap::new(); | ||
blocks_meta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {}); | ||
|
||
SubscribeRequest { | ||
slots: slots_subs, | ||
accounts: accounts_subs, | ||
blocks_meta: blocks_meta_subs, | ||
..Default::default() | ||
} | ||
} | ||
|
||
pub fn all_accounts() -> SubscribeRequest { | ||
let mut accounts_subs = HashMap::new(); | ||
accounts_subs.insert( | ||
"client".to_string(), | ||
SubscribeRequestFilterAccounts { | ||
account: vec![], | ||
owner: vec![], | ||
filters: vec![], | ||
}, | ||
); | ||
|
||
SubscribeRequest { | ||
accounts: accounts_subs, | ||
..Default::default() | ||
} | ||
} |