Skip to content

Commit

Permalink
compatible updater channel
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 21, 2025
1 parent 05581ba commit dea3f0f
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 45 deletions.
5 changes: 2 additions & 3 deletions examples/stream_blocks_autoconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;

use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
Expand All @@ -12,7 +11,7 @@ use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConf
use tokio::time::{sleep, Duration};
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;

pub struct BlockMini {
Expand Down Expand Up @@ -92,7 +91,7 @@ pub async fn main() {

let (_, exit_notify) = broadcast::channel(1);

let (jh_geyser_task, message_channel, client_subscribe_tx) = create_geyser_autoconnection_task(
let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
exit_notify,
Expand Down
5 changes: 2 additions & 3 deletions examples/stream_token_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::env;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc::Sender;

use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::{
Expand Down Expand Up @@ -58,14 +57,14 @@ pub async fn main() {
let (exit_signal, _exit_notify) = tokio::sync::broadcast::channel(1);
let (autoconnect_tx, mut accounts_rx) = tokio::sync::mpsc::channel(1000);

let (_jh_green, client_subscribe_tx) = create_geyser_autoconnection_task_with_mpsc(
let _jh_green = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
token_accounts(),
autoconnect_tx.clone(),
exit_signal.subscribe(),
);

let (_jh_blue, _) = create_geyser_autoconnection_task_with_mpsc(
let _jh_blue = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
token_accounts_finalized(),
autoconnect_tx.clone(),
Expand Down
23 changes: 12 additions & 11 deletions examples/subscribe_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ pub async fn main() {

let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1);
let (subscribe_filter_update_tx, mut _subscribe_filter_update_rx) =
tokio::sync::mpsc::channel::<SubscribeRequest>(1);

let (_jh, client_subscribe_tx) = create_geyser_autoconnection_task_with_mpsc(
let _jh = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
jito2_account(),
autoconnect_tx.clone(),
Expand All @@ -63,14 +65,13 @@ pub async fn main() {

// testcase 1
// test if the autoconnector continues to work even if the channel drops
// drop(client_subscribe_tx);
// drop(subscribe_filter_update_tx);

// testcase 2
spawn_subscribe_filter_updater(client_subscribe_tx.clone());

spawn_subscribe_filter_updater(subscribe_filter_update_tx.clone());

// testcase 3
// spawn_subscribe_broken_filter_updater(client_subscribe_tx.clone());
// spawn_subscribe_broken_filter_updater(subscribe_filter_update_tx.clone());

let current_processed_slot = AtomicSlot::default();
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());
Expand Down Expand Up @@ -114,25 +115,27 @@ fn start_tracking_account_consumer(
});
}

fn spawn_subscribe_filter_updater(client_subscribe_tx: Sender<SubscribeRequest>) {
#[allow(dead_code)]
fn spawn_subscribe_filter_updater(subscribe_filter_update_tx: Sender<SubscribeRequest>) {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(5)).await;
info!("updating filters");
client_subscribe_tx
subscribe_filter_update_tx
.send(jito1_account())
.await
.expect("send");
}
});
}

fn spawn_subscribe_broken_filter_updater(client_subscribe_tx: Sender<SubscribeRequest>) {
#[allow(dead_code)]
fn spawn_subscribe_broken_filter_updater(subscribe_filter_update_tx: Sender<SubscribeRequest>) {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(5)).await;
info!("updating filters");
client_subscribe_tx
subscribe_filter_update_tx
.send(broken_subscription())
.await
.expect("send");
Expand Down Expand Up @@ -255,7 +258,6 @@ pub fn jito2_account() -> SubscribeRequest {
}
}


pub fn broken_subscription() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
Expand All @@ -273,7 +275,6 @@ pub fn broken_subscription() -> SubscribeRequest {
}
}


pub fn slots() -> SubscribeRequest {
let mut slots_subs = HashMap::new();
slots_subs.insert(
Expand Down
67 changes: 39 additions & 28 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use std::collections::HashMap;
use std::env;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use futures::channel::mpsc::SendError;
use futures::{Sink, SinkExt, Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use tokio::select;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout, Instant};
use yellowstone_grpc_client::{GeyserGrpcBuilderError, GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdate,
};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::Status;

Expand Down Expand Up @@ -55,33 +50,50 @@ pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
exit_notify: broadcast::Receiver<()>,
) -> (JoinHandle<()>, Receiver<Message>, Sender<SubscribeRequest>) {
) -> (JoinHandle<()>, mpsc::Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);

// TODO rename
let (join_handle, client_subscribe_tx) = create_geyser_autoconnection_task_with_mpsc(
let join_handle = create_geyser_autoconnection_task_with_mpsc(
grpc_source,
subscribe_filter,
sender,
exit_notify,
);

(join_handle, receiver_channel, client_subscribe_tx)
(join_handle, receiver_channel)
}

// compat
pub fn create_geyser_autoconnection_task_with_mpsc(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
mpsc_downstream: mpsc::Sender<Message>,
exit_notify: broadcast::Receiver<()>,
) -> JoinHandle<()> {
let (_noop_subscribe_filter_update_tx, subscribe_filter_update_rx) =
mpsc::channel::<SubscribeRequest>(1);

create_geyser_autoconnection_task_with_updater(
grpc_source,
subscribe_filter,
mpsc_downstream,
exit_notify,
subscribe_filter_update_rx,
)
}

/// connect to grpc source performing autoconnect if required,
/// returns mpsc channel; task will abort on fatal error
/// will shut down when receiver is dropped
pub fn create_geyser_autoconnection_task_with_mpsc(
///
/// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
pub fn create_geyser_autoconnection_task_with_updater(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
mpsc_downstream: mpsc::Sender<Message>,
mut exit_notify: broadcast::Receiver<()>,
) -> (JoinHandle<()>, Sender<SubscribeRequest>) {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
let (client_subscribe_tx, mut client_subscribe_rx) =
tokio::sync::mpsc::channel::<SubscribeRequest>(1);

mut subscribe_filter_update_rx: mpsc::Receiver<SubscribeRequest>,
) -> JoinHandle<()> {
// task will be aborted when downstream receiver gets dropped
// there are two ways to terminate: 1) using break 'main_loop 2) return from task
let jh_geyser_task = tokio::spawn(async move {
Expand Down Expand Up @@ -163,7 +175,10 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let subscribe_timeout =
grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let subscribe_filter_on_connect = subscribe_filter_on_connect.clone();
debug!("Subscribe initially with filter {:?}", subscribe_filter_on_connect);
debug!(
"Subscribe initially with filter {:?}",
subscribe_filter_on_connect
);

let fut_subscribe = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
Expand Down Expand Up @@ -267,6 +282,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
}
ConnectionState::Ready(mut geyser_stream, mut geyser_subscribe_tx) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);

'recv_loop: loop {
select! {
exit_res = exit_notify.recv() => {
Expand All @@ -283,15 +299,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
}
break 'recv_loop ConnectionState::GracefulShutdown;
},
client_subscribe_update = client_subscribe_rx.recv() => {
// could model subscribe_filter_update_rx as optional here but did not figure out how
client_subscribe_update = subscribe_filter_update_rx.recv() => {
match client_subscribe_update {
Some(subscribe_request) => {
debug!("Subscription update from client with filter {:?}", subscribe_request);
subscribe_filter_on_connect = subscribe_request.clone();
// note: if the subscription is invalid, it will trigger a Tonic error:
// Status { code: InvalidArgument, message: "failed to create filter: Invalid Base58 string", source: None }
if let Err(send_err) = geyser_subscribe_tx.send(subscribe_request).await {
warn!("fail to send subscription update - disconnect and retry");
warn!("fail to send subscription update - disconnect and retry: {}", send_err);
break 'recv_loop ConnectionState::WaitReconnect(1);
};
}
Expand All @@ -306,12 +323,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
geyser_stream.next(),
) => {

// let MaybeExit::Continue(geyser_stream_res) =
// await_or_exit(fut_stream, exit_notify.recv()).await
// else {
// break 'recv_loop ConnectionState::GracefulShutdown;
// };

match geyser_stream_res {
Ok(Some(Ok(update_message))) => {
trace!("> recv update message from {}", grpc_source);
Expand Down Expand Up @@ -415,7 +426,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
debug!("gracefully exiting geyser task loop");
});

(jh_geyser_task, client_subscribe_tx)
jh_geyser_task
}

fn buffer_config_from_env() -> GeyserGrpcClientBufferConfig {
Expand Down

0 comments on commit dea3f0f

Please sign in to comment.