diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index e9a44e9..cceac69 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -3,7 +3,6 @@ use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::env; use tokio::sync::broadcast; -use tokio::sync::mpsc::Sender; use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task; @@ -12,7 +11,7 @@ use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConf use tokio::time::{sleep, Duration}; use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; +use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; pub struct BlockMini { @@ -92,7 +91,7 @@ pub async fn main() { let (_, exit_notify) = broadcast::channel(1); - let (jh_geyser_task, message_channel, client_subscribe_tx) = create_geyser_autoconnection_task( + let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), exit_notify, diff --git a/examples/stream_token_accounts.rs b/examples/stream_token_accounts.rs index 8da339e..77368c1 100644 --- a/examples/stream_token_accounts.rs +++ b/examples/stream_token_accounts.rs @@ -14,7 +14,6 @@ use std::env; use std::str::FromStr; use std::sync::Arc; use std::time::Instant; -use tokio::sync::mpsc::Sender; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; use geyser_grpc_connector::{ @@ -58,14 +57,14 @@ pub async fn main() { let (exit_signal, _exit_notify) = tokio::sync::broadcast::channel(1); let (autoconnect_tx, mut accounts_rx) = tokio::sync::mpsc::channel(1000); - let (_jh_green, client_subscribe_tx) = create_geyser_autoconnection_task_with_mpsc( + let _jh_green = create_geyser_autoconnection_task_with_mpsc( config.clone(), token_accounts(), autoconnect_tx.clone(), exit_signal.subscribe(), ); - let (_jh_blue, _) = create_geyser_autoconnection_task_with_mpsc( + let _jh_blue = create_geyser_autoconnection_task_with_mpsc( config.clone(), token_accounts_finalized(), autoconnect_tx.clone(), diff --git a/examples/subscribe_accounts.rs b/examples/subscribe_accounts.rs index bc00fc9..146344a 100644 --- a/examples/subscribe_accounts.rs +++ b/examples/subscribe_accounts.rs @@ -53,8 +53,10 @@ pub async fn main() { let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10); let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1); + let (subscribe_filter_update_tx, mut _subscribe_filter_update_rx) = + tokio::sync::mpsc::channel::(1); - let (_jh, client_subscribe_tx) = create_geyser_autoconnection_task_with_mpsc( + let _jh = create_geyser_autoconnection_task_with_mpsc( config.clone(), jito2_account(), autoconnect_tx.clone(), @@ -63,14 +65,13 @@ pub async fn main() { // testcase 1 // test if the autoconnector continues to work even if the channel drops - // drop(client_subscribe_tx); + // drop(subscribe_filter_update_tx); // testcase 2 - spawn_subscribe_filter_updater(client_subscribe_tx.clone()); - + spawn_subscribe_filter_updater(subscribe_filter_update_tx.clone()); // testcase 3 - // spawn_subscribe_broken_filter_updater(client_subscribe_tx.clone()); + // spawn_subscribe_broken_filter_updater(subscribe_filter_update_tx.clone()); let current_processed_slot = AtomicSlot::default(); start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone()); @@ -114,12 +115,13 @@ fn start_tracking_account_consumer( }); } -fn spawn_subscribe_filter_updater(client_subscribe_tx: Sender) { +#[allow(dead_code)] +fn spawn_subscribe_filter_updater(subscribe_filter_update_tx: Sender) { tokio::spawn(async move { loop { sleep(Duration::from_secs(5)).await; info!("updating filters"); - client_subscribe_tx + subscribe_filter_update_tx .send(jito1_account()) .await .expect("send"); @@ -127,12 +129,13 @@ fn spawn_subscribe_filter_updater(client_subscribe_tx: Sender) }); } -fn spawn_subscribe_broken_filter_updater(client_subscribe_tx: Sender) { +#[allow(dead_code)] +fn spawn_subscribe_broken_filter_updater(subscribe_filter_update_tx: Sender) { tokio::spawn(async move { loop { sleep(Duration::from_secs(5)).await; info!("updating filters"); - client_subscribe_tx + subscribe_filter_update_tx .send(broken_subscription()) .await .expect("send"); @@ -255,7 +258,6 @@ pub fn jito2_account() -> SubscribeRequest { } } - pub fn broken_subscription() -> SubscribeRequest { let mut accounts_subs = HashMap::new(); accounts_subs.insert( @@ -273,7 +275,6 @@ pub fn broken_subscription() -> SubscribeRequest { } } - pub fn slots() -> SubscribeRequest { let mut slots_subs = HashMap::new(); slots_subs.insert( diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 25bcd9a..123deb9 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,23 +1,18 @@ -use std::collections::HashMap; use std::env; use std::future::Future; -use std::sync::Arc; use std::time::Duration; -use futures::channel::mpsc::SendError; use futures::{Sink, SinkExt, Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use tokio::select; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; +use tokio::sync::mpsc; use tokio::sync::mpsc::error::SendTimeoutError; -use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout, Instant}; use yellowstone_grpc_client::{GeyserGrpcBuilderError, GeyserGrpcClient, GeyserGrpcClientError}; -use yellowstone_grpc_proto::geyser::{ - SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdate, -}; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::Status; @@ -55,33 +50,50 @@ pub fn create_geyser_autoconnection_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, exit_notify: broadcast::Receiver<()>, -) -> (JoinHandle<()>, Receiver, Sender) { +) -> (JoinHandle<()>, mpsc::Receiver) { let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); - // TODO rename - let (join_handle, client_subscribe_tx) = create_geyser_autoconnection_task_with_mpsc( + let join_handle = create_geyser_autoconnection_task_with_mpsc( grpc_source, subscribe_filter, sender, exit_notify, ); - (join_handle, receiver_channel, client_subscribe_tx) + (join_handle, receiver_channel) +} + +// compat +pub fn create_geyser_autoconnection_task_with_mpsc( + grpc_source: GrpcSourceConfig, + subscribe_filter: SubscribeRequest, + mpsc_downstream: mpsc::Sender, + exit_notify: broadcast::Receiver<()>, +) -> JoinHandle<()> { + let (_noop_subscribe_filter_update_tx, subscribe_filter_update_rx) = + mpsc::channel::(1); + + create_geyser_autoconnection_task_with_updater( + grpc_source, + subscribe_filter, + mpsc_downstream, + exit_notify, + subscribe_filter_update_rx, + ) } /// connect to grpc source performing autoconnect if required, /// returns mpsc channel; task will abort on fatal error /// will shut down when receiver is dropped -pub fn create_geyser_autoconnection_task_with_mpsc( +/// +/// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ +pub fn create_geyser_autoconnection_task_with_updater( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, - mpsc_downstream: tokio::sync::mpsc::Sender, + mpsc_downstream: mpsc::Sender, mut exit_notify: broadcast::Receiver<()>, -) -> (JoinHandle<()>, Sender) { - // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ - let (client_subscribe_tx, mut client_subscribe_rx) = - tokio::sync::mpsc::channel::(1); - + mut subscribe_filter_update_rx: mpsc::Receiver, +) -> JoinHandle<()> { // task will be aborted when downstream receiver gets dropped // there are two ways to terminate: 1) using break 'main_loop 2) return from task let jh_geyser_task = tokio::spawn(async move { @@ -163,7 +175,10 @@ pub fn create_geyser_autoconnection_task_with_mpsc( let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); let subscribe_filter_on_connect = subscribe_filter_on_connect.clone(); - debug!("Subscribe initially with filter {:?}", subscribe_filter_on_connect); + debug!( + "Subscribe initially with filter {:?}", + subscribe_filter_on_connect + ); let fut_subscribe = timeout( subscribe_timeout.unwrap_or(Duration::MAX), @@ -267,6 +282,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( } ConnectionState::Ready(mut geyser_stream, mut geyser_subscribe_tx) => { let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); + 'recv_loop: loop { select! { exit_res = exit_notify.recv() => { @@ -283,7 +299,8 @@ pub fn create_geyser_autoconnection_task_with_mpsc( } break 'recv_loop ConnectionState::GracefulShutdown; }, - client_subscribe_update = client_subscribe_rx.recv() => { + // could model subscribe_filter_update_rx as optional here but did not figure out how + client_subscribe_update = subscribe_filter_update_rx.recv() => { match client_subscribe_update { Some(subscribe_request) => { debug!("Subscription update from client with filter {:?}", subscribe_request); @@ -291,7 +308,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( // note: if the subscription is invalid, it will trigger a Tonic error: // Status { code: InvalidArgument, message: "failed to create filter: Invalid Base58 string", source: None } if let Err(send_err) = geyser_subscribe_tx.send(subscribe_request).await { - warn!("fail to send subscription update - disconnect and retry"); + warn!("fail to send subscription update - disconnect and retry: {}", send_err); break 'recv_loop ConnectionState::WaitReconnect(1); }; } @@ -306,12 +323,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc( geyser_stream.next(), ) => { - // let MaybeExit::Continue(geyser_stream_res) = - // await_or_exit(fut_stream, exit_notify.recv()).await - // else { - // break 'recv_loop ConnectionState::GracefulShutdown; - // }; - match geyser_stream_res { Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); @@ -415,7 +426,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( debug!("gracefully exiting geyser task loop"); }); - (jh_geyser_task, client_subscribe_tx) + jh_geyser_task } fn buffer_config_from_env() -> GeyserGrpcClientBufferConfig {